mirror of https://github.com/apache/druid.git
Catalog integration tests (#16424)
* * add new catalog IT with failure to ensure that it is run in CI * * actually add failing test referred to and fix checkstyle * * add some tests * * fix checkstyle * * add test descriptions * * add more tests
This commit is contained in:
parent
ed9881df88
commit
dd5dc500ce
|
@ -0,0 +1,448 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testsEx.catalog;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.catalog.model.Columns;
|
||||
import org.apache.druid.catalog.model.TableMetadata;
|
||||
import org.apache.druid.catalog.model.table.ClusterKeySpec;
|
||||
import org.apache.druid.catalog.model.table.DatasourceDefn;
|
||||
import org.apache.druid.catalog.model.table.TableBuilder;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.msq.sql.SqlTaskStatus;
|
||||
import org.apache.druid.testing.utils.DataLoaderHelper;
|
||||
import org.apache.druid.testing.utils.MsqTestQueryHelper;
|
||||
import org.apache.druid.testsEx.cluster.CatalogClient;
|
||||
import org.apache.druid.testsEx.cluster.DruidClusterClient;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests that expect succesfully ingestng data into catalog defined tables and querying the data
|
||||
* gives expected results.
|
||||
*/
|
||||
public abstract class ITCatalogIngestAndQueryTest
|
||||
{
|
||||
public static final Logger LOG = new Logger(ITCatalogIngestAndQueryTest.class);
|
||||
|
||||
@Inject
|
||||
private MsqTestQueryHelper msqHelper;
|
||||
@Inject
|
||||
private DataLoaderHelper dataLoaderHelper;
|
||||
@Inject
|
||||
private DruidClusterClient clusterClient;
|
||||
private CatalogClient client;
|
||||
|
||||
private final String operationName;
|
||||
private final String dmlPrefixPattern;
|
||||
|
||||
public ITCatalogIngestAndQueryTest()
|
||||
{
|
||||
this.operationName = getOperationName();
|
||||
this.dmlPrefixPattern = getDmlPrefixPattern();
|
||||
}
|
||||
|
||||
public abstract String getOperationName();
|
||||
public abstract String getDmlPrefixPattern();
|
||||
|
||||
@Before
|
||||
public void initializeClient()
|
||||
{
|
||||
client = new CatalogClient(clusterClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create table with columns:
|
||||
* <p>
|
||||
* __time LONG
|
||||
* double_col1 DOUBLE
|
||||
* <p>
|
||||
* And insert the following data:
|
||||
* <p>
|
||||
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
|
||||
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
|
||||
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
|
||||
* <p>
|
||||
* When querying the table with query: 'SELECT * from ##tableName', the BIGINT type column should
|
||||
* be implicitly coherced into type DOUBLE when inserted into the table, since the column being
|
||||
* written into is type DOUBLE.
|
||||
* <p>
|
||||
* __time, bigint_col1
|
||||
* 2022-12-26T12:34:56,8.0
|
||||
* 2022-12-26T12:34:56,8.0
|
||||
* 2022-12-26T12:34:56,9.0
|
||||
* 2022-12-26T12:34:56,10.0
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testInsertImplicitCast() throws Exception
|
||||
{
|
||||
String queryFile = "/catalog/implicitCast_select.sql";
|
||||
String tableName = "testImplicitCast" + operationName;
|
||||
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("double_col1", "DOUBLE")
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
LOG.info("table created:\n%s", client.readTable(table.id()));
|
||||
String queryInline =
|
||||
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " c AS double_col1\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n";
|
||||
|
||||
// Submit the task and wait for the datasource to get loaded
|
||||
LOG.info("Running query:\n%s", queryInline);
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
|
||||
|
||||
if (sqlTaskStatus.getState().isFailure()) {
|
||||
Assert.fail(StringUtils.format(
|
||||
"Unable to start the task successfully.\nPossible exception: %s",
|
||||
sqlTaskStatus.getError()
|
||||
));
|
||||
}
|
||||
|
||||
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
|
||||
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
|
||||
|
||||
msqHelper.testQueriesFromFile(queryFile, tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create table with columns:
|
||||
* <p>
|
||||
* __time LONG
|
||||
* double_col1 DOUBLE
|
||||
* <p>
|
||||
* and clustering columns defined in catalog as
|
||||
* <p>
|
||||
* bigInt_col1
|
||||
* <p>
|
||||
* And insert the following data:
|
||||
* <p>
|
||||
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
|
||||
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
|
||||
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
|
||||
* <p>
|
||||
* When querying the table with query: 'SELECT * from ##tableName', because of the clustering
|
||||
* defined on the table, the data should be reordered to:
|
||||
* <p>
|
||||
* __time, bigint_col1
|
||||
* 2022-12-26T12:34:56,8
|
||||
* 2022-12-26T12:34:56,8
|
||||
* 2022-12-26T12:34:56,9
|
||||
* 2022-12-26T12:34:56,10
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testInsertWithClusteringFromCatalog() throws Exception
|
||||
{
|
||||
String queryFile = "/catalog/clustering_select.sql";
|
||||
String tableName = "testWithClusteringFromCatalog" + operationName;
|
||||
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("bigint_col1", "BIGINT")
|
||||
.property(
|
||||
DatasourceDefn.CLUSTER_KEYS_PROPERTY,
|
||||
ImmutableList.of(new ClusterKeySpec("bigint_col1", false))
|
||||
)
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
LOG.info("table created:\n%s", client.readTable(table.id()));
|
||||
String queryInline =
|
||||
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " c AS bigint_col1\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n";
|
||||
|
||||
// Submit the task and wait for the datasource to get loaded
|
||||
LOG.info("Running query:\n%s", queryInline);
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
|
||||
|
||||
if (sqlTaskStatus.getState().isFailure()) {
|
||||
Assert.fail(StringUtils.format(
|
||||
"Unable to start the task successfully.\nPossible exception: %s",
|
||||
sqlTaskStatus.getError()
|
||||
));
|
||||
}
|
||||
|
||||
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
|
||||
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
|
||||
|
||||
msqHelper.testQueriesFromFile(queryFile, tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create table with columns:
|
||||
* <p>
|
||||
* __time LONG
|
||||
* double_col1 DOUBLE
|
||||
* <p>
|
||||
* and clustering columns defined in query as
|
||||
* <p>
|
||||
* bigInt_col1
|
||||
* <p>
|
||||
* And insert the following data:
|
||||
* <p>
|
||||
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
|
||||
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
|
||||
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
|
||||
* <p>
|
||||
* When querying the table with query: 'SELECT * from ##tableName', because of the clustering
|
||||
* defined on the table, the data should be reordered to:
|
||||
* <p>
|
||||
* __time, bigint_col1
|
||||
* 2022-12-26T12:34:56,8
|
||||
* 2022-12-26T12:34:56,8
|
||||
* 2022-12-26T12:34:56,9
|
||||
* 2022-12-26T12:34:56,10
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testInsertWithClusteringFromQuery() throws Exception
|
||||
{
|
||||
String queryFile = "/catalog/clustering_select.sql";
|
||||
String tableName = "testWithClusteringFromQuery" + operationName;
|
||||
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("bigint_col1", "BIGINT")
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
LOG.info("table created:\n%s", client.readTable(table.id()));
|
||||
String queryInline =
|
||||
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " c AS bigint_col1\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
|
||||
+ "CLUSTERED BY \"bigint_col1\"\n";
|
||||
|
||||
// Submit the task and wait for the datasource to get loaded
|
||||
LOG.info("Running query:\n%s", queryInline);
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
|
||||
|
||||
if (sqlTaskStatus.getState().isFailure()) {
|
||||
Assert.fail(StringUtils.format(
|
||||
"Unable to start the task successfully.\nPossible exception: %s",
|
||||
sqlTaskStatus.getError()
|
||||
));
|
||||
}
|
||||
|
||||
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
|
||||
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
|
||||
|
||||
msqHelper.testQueriesFromFile(queryFile, tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create table with columns:
|
||||
* <p>
|
||||
* __time LONG
|
||||
* varchar_col1 VARCHAR
|
||||
* bigint_col1 BIGINT
|
||||
* float_col1 FLOAT
|
||||
* varchar_col2 VARCHAR
|
||||
* <p>
|
||||
* and multiple clustering columns defined in catalog as
|
||||
* <p>
|
||||
* bigInt_col1, varchar_col2
|
||||
* <p>
|
||||
* And insert the following data:
|
||||
* <p>
|
||||
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
|
||||
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
|
||||
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
|
||||
* <p>
|
||||
* When querying the table with query: 'SELECT * from ##tableName', because of the clustering
|
||||
* defined on the table, the data should be reordered to:
|
||||
* <p>
|
||||
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
|
||||
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
|
||||
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
|
||||
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testInsertWithMultiClusteringFromCatalog() throws Exception
|
||||
{
|
||||
String queryFile = "/catalog/multiClustering_select.sql";
|
||||
String tableName = "testWithMultiClusteringFromCatalog" + operationName;
|
||||
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("varchar_col1", "VARCHAR")
|
||||
.column("bigint_col1", "BIGINT")
|
||||
.column("float_col1", "FLOAT")
|
||||
.column("varchar_col2", "VARCHAR")
|
||||
.property(
|
||||
DatasourceDefn.CLUSTER_KEYS_PROPERTY,
|
||||
ImmutableList.of(new ClusterKeySpec("bigint_col1", false), new ClusterKeySpec("varchar_col2", false))
|
||||
)
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
LOG.info("table created:\n%s", client.readTable(table.id()));
|
||||
String queryInline =
|
||||
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " b AS varchar_col1,\n"
|
||||
+ " c AS bigint_col1,\n"
|
||||
+ " e AS float_col1,\n"
|
||||
+ " f AS varchar_col2\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n";
|
||||
|
||||
// Submit the task and wait for the datasource to get loaded
|
||||
LOG.info("Running query:\n%s", queryInline);
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
|
||||
|
||||
if (sqlTaskStatus.getState().isFailure()) {
|
||||
Assert.fail(StringUtils.format(
|
||||
"Unable to start the task successfully.\nPossible exception: %s",
|
||||
sqlTaskStatus.getError()
|
||||
));
|
||||
}
|
||||
|
||||
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
|
||||
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
|
||||
|
||||
msqHelper.testQueriesFromFile(queryFile, tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create table with columns:
|
||||
* <p>
|
||||
* __time LONG
|
||||
* varchar_col1 VARCHAR
|
||||
* bigint_col1 BIGINT
|
||||
* float_col1 FLOAT
|
||||
* varchar_col2 VARCHAR
|
||||
* <p>
|
||||
* and multiple clustering columns defined in query as
|
||||
* <p>
|
||||
* bigInt_col1, varchar_col2
|
||||
* <p>
|
||||
* And insert the following data:
|
||||
* <p>
|
||||
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
|
||||
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
|
||||
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
|
||||
* <p>
|
||||
* When querying the table with query: 'SELECT * from ##tableName', because of the clustering
|
||||
* defined on the query, the data should be reordered to:
|
||||
* <p>
|
||||
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
|
||||
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
|
||||
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
|
||||
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
|
||||
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testInsertWithMultiClusteringFromQuery() throws Exception
|
||||
{
|
||||
String queryFile = "/catalog/multiClustering_select.sql";
|
||||
String tableName = "testWithMultiClusteringFromQuery" + operationName;
|
||||
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("varchar_col1", "VARCHAR")
|
||||
.column("bigint_col1", "BIGINT")
|
||||
.column("float_col1", "FLOAT")
|
||||
.column("varchar_col2", "VARCHAR")
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
LOG.info("table created:\n%s", client.readTable(table.id()));
|
||||
String queryInline =
|
||||
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " b AS varchar_col1,\n"
|
||||
+ " c AS bigint_col1,\n"
|
||||
+ " e AS float_col1,\n"
|
||||
+ " f AS varchar_col2\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
|
||||
+ "CLUSTERED BY \"bigint_col1\", \"varchar_col2\"\n";
|
||||
|
||||
// Submit the task and wait for the datasource to get loaded
|
||||
LOG.info("Running query:\n%s", queryInline);
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
|
||||
|
||||
if (sqlTaskStatus.getState().isFailure()) {
|
||||
Assert.fail(StringUtils.format(
|
||||
"Unable to start the task successfully.\nPossible exception: %s",
|
||||
sqlTaskStatus.getError()
|
||||
));
|
||||
}
|
||||
|
||||
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
|
||||
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
|
||||
|
||||
msqHelper.testQueriesFromFile(queryFile, tableName);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,251 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testsEx.catalog;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.catalog.model.Columns;
|
||||
import org.apache.druid.catalog.model.TableId;
|
||||
import org.apache.druid.catalog.model.TableMetadata;
|
||||
import org.apache.druid.catalog.model.table.DatasourceDefn;
|
||||
import org.apache.druid.catalog.model.table.TableBuilder;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.msq.sql.SqlTaskStatus;
|
||||
import org.apache.druid.sql.http.SqlQuery;
|
||||
import org.apache.druid.testing.utils.MsqTestQueryHelper;
|
||||
import org.apache.druid.testsEx.categories.Catalog;
|
||||
import org.apache.druid.testsEx.cluster.CatalogClient;
|
||||
import org.apache.druid.testsEx.cluster.DruidClusterClient;
|
||||
import org.apache.druid.testsEx.config.DruidTestRunner;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Tests that expect failures when ingestng data into catalog defined tables.
|
||||
*/
|
||||
@RunWith(DruidTestRunner.class)
|
||||
@Category(Catalog.class)
|
||||
public class ITCatalogIngestErrorTest
|
||||
{
|
||||
@Inject
|
||||
private MsqTestQueryHelper msqHelper;
|
||||
@Inject
|
||||
private DruidClusterClient clusterClient;
|
||||
private CatalogClient client;
|
||||
|
||||
@Before
|
||||
public void initializeClient()
|
||||
{
|
||||
client = new CatalogClient(clusterClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
|
||||
* validation error.
|
||||
*/
|
||||
@Test
|
||||
public void testInsertNoPartitonedByFromCatalogOrQuery() throws ExecutionException, InterruptedException
|
||||
{
|
||||
String tableName = "testInsertNoPartitonedByFromCatalogOrQuery";
|
||||
TableMetadata table = new TableBuilder(TableId.datasource(tableName), DatasourceDefn.TABLE_TYPE)
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("varchar_col", "VARCHAR")
|
||||
.column("bigint_col", "BIGINT")
|
||||
.column("float_col", "FLOAT")
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
String queryInline =
|
||||
StringUtils.format(
|
||||
"INSERT INTO %s\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " b AS varchar_col,\n"
|
||||
+ " c AS bigint_col,\n"
|
||||
+ " e AS float_col\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n",
|
||||
tableName
|
||||
);
|
||||
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(sqlQueryFromString(queryInline), null, null, HttpResponseStatus.BAD_REQUEST);
|
||||
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
|
||||
.getUnderlyingException()
|
||||
.getMessage()
|
||||
.equals(
|
||||
"Operation [INSERT] requires a PARTITIONED BY to be explicitly defined, but none was found.")
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adding a new column during ingestion that is not defined in a sealed table should fail with
|
||||
* proper validation error.
|
||||
*/
|
||||
@Test
|
||||
public void testInsertNonDefinedColumnIntoSealedCatalogTable() throws ExecutionException, InterruptedException
|
||||
{
|
||||
String tableName = "testInsertNonDefinedColumnIntoSealedCatalogTable";
|
||||
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("varchar_col", "VARCHAR")
|
||||
.column("bigint_col", "BIGINT")
|
||||
.column("float_col", "FLOAT")
|
||||
.property(DatasourceDefn.SEALED_PROPERTY, true)
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
String queryInline =
|
||||
StringUtils.format(
|
||||
"INSERT INTO %s\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " b AS varchar_col,\n"
|
||||
+ " c AS bigint_col,\n"
|
||||
+ " e AS float_col,\n"
|
||||
+ " c AS extra\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
|
||||
+ "PARTITIONED BY DAY\n",
|
||||
tableName
|
||||
);
|
||||
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(sqlQueryFromString(queryInline), null, null, HttpResponseStatus.BAD_REQUEST);
|
||||
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
|
||||
.getUnderlyingException()
|
||||
.getMessage()
|
||||
.equals(
|
||||
"Column [extra] is not defined in the target table [druid.testInsertNonDefinedColumnIntoSealedCatalogTable] strict schema")
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assigning a column during ingestion, to an input type that is not compatible with the defined type of the
|
||||
* column, should result in a proper validation error.
|
||||
*/
|
||||
@Test
|
||||
public void testInsertWithIncompatibleTypeAssignment() throws ExecutionException, InterruptedException
|
||||
{
|
||||
String tableName = "testInsertWithIncompatibleTypeAssignment";
|
||||
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("varchar_col", "VARCHAR")
|
||||
.column("bigint_col", "BIGINT")
|
||||
.column("float_col", "FLOAT")
|
||||
.property(DatasourceDefn.SEALED_PROPERTY, true)
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
String queryInline =
|
||||
StringUtils.format(
|
||||
"INSERT INTO %s\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " ARRAY[b] AS varchar_col,\n"
|
||||
+ " c AS bigint_col,\n"
|
||||
+ " e AS float_col\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
|
||||
+ "PARTITIONED BY DAY\n",
|
||||
tableName
|
||||
);
|
||||
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(sqlQueryFromString(queryInline), null, null, HttpResponseStatus.BAD_REQUEST);
|
||||
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
|
||||
.getUnderlyingException()
|
||||
.getMessage()
|
||||
.equals(
|
||||
"Cannot assign to target field 'varchar_col' of type VARCHAR from source field 'varchar_col' of type VARCHAR ARRAY (line [4], column [3])")
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assigning a complex type column during ingestion, to an input type that is not compatible with the defined type of
|
||||
* the column, should result in a proper validation error.
|
||||
*/
|
||||
@Test
|
||||
public void testInsertGroupByWithIncompatibleTypeAssignment() throws ExecutionException, InterruptedException
|
||||
{
|
||||
String tableName = "testInsertGroupByWithIncompatibleTypeAssignment";
|
||||
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
|
||||
.column(Columns.TIME_COLUMN, Columns.LONG)
|
||||
.column("varchar_col", "VARCHAR")
|
||||
.column("bigint_col", "BIGINT")
|
||||
.column("float_col", "FLOAT")
|
||||
.column("hll_col", "COMPLEX<hyperUnique>")
|
||||
.property(DatasourceDefn.SEALED_PROPERTY, true)
|
||||
.build();
|
||||
|
||||
client.createTable(table, true);
|
||||
String queryInline =
|
||||
StringUtils.format(
|
||||
"INSERT INTO %s\n"
|
||||
+ "SELECT\n"
|
||||
+ " TIME_PARSE(a) AS __time,\n"
|
||||
+ " b AS varchar_col,\n"
|
||||
+ " c AS bigint_col,\n"
|
||||
+ " e AS float_col,\n"
|
||||
+ " ARRAY[b] AS hll_col\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
|
||||
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
|
||||
+ " )\n"
|
||||
+ ") "
|
||||
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
|
||||
+ "PARTITIONED BY DAY\n",
|
||||
tableName
|
||||
);
|
||||
|
||||
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(sqlQueryFromString(queryInline), null, null, HttpResponseStatus.BAD_REQUEST);
|
||||
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
|
||||
.getUnderlyingException()
|
||||
.getMessage()
|
||||
.equals(
|
||||
"Cannot assign to target field 'hll_col' of type COMPLEX<hyperUnique> from source field 'hll_col' of type VARCHAR ARRAY (line [7], column [3])")
|
||||
);
|
||||
}
|
||||
|
||||
private static SqlQuery sqlQueryFromString(String queryString)
|
||||
{
|
||||
return new SqlQuery(queryString, null, false, false, false, ImmutableMap.of(), null);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testsEx.catalog;
|
||||
|
||||
import org.apache.druid.testsEx.categories.Catalog;
|
||||
import org.apache.druid.testsEx.config.DruidTestRunner;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
@RunWith(DruidTestRunner.class)
|
||||
@Category(Catalog.class)
|
||||
public class ITCatalogInsertAndQueryTest extends ITCatalogIngestAndQueryTest
|
||||
{
|
||||
@Override
|
||||
public String getOperationName()
|
||||
{
|
||||
return "INSERT";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDmlPrefixPattern()
|
||||
{
|
||||
return "INSERT INTO \"%s\"";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.testsEx.catalog;
|
||||
|
||||
import org.apache.druid.testsEx.categories.Catalog;
|
||||
import org.apache.druid.testsEx.config.DruidTestRunner;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
@RunWith(DruidTestRunner.class)
|
||||
@Category(Catalog.class)
|
||||
public class ITCatalogReplaceAndQueryTest extends ITCatalogIngestAndQueryTest
|
||||
{
|
||||
@Override
|
||||
public String getOperationName()
|
||||
{
|
||||
return "REPLACE";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDmlPrefixPattern()
|
||||
{
|
||||
return "REPLACE INTO \"%s\" OVERWRITE ALL";
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.testsEx.catalog;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.catalog.http.TableEditRequest.DropColumns;
|
||||
import org.apache.druid.catalog.http.TableEditRequest.HideColumns;
|
||||
|
@ -28,6 +29,7 @@ import org.apache.druid.catalog.model.CatalogUtils;
|
|||
import org.apache.druid.catalog.model.TableId;
|
||||
import org.apache.druid.catalog.model.TableMetadata;
|
||||
import org.apache.druid.catalog.model.TableSpec;
|
||||
import org.apache.druid.catalog.model.table.ClusterKeySpec;
|
||||
import org.apache.druid.catalog.model.table.DatasourceDefn;
|
||||
import org.apache.druid.catalog.model.table.TableBuilder;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
|
@ -106,6 +108,17 @@ public class ITCatalogRestTest
|
|||
() -> client.createTable(table, false)
|
||||
);
|
||||
}
|
||||
|
||||
// DESC cluster keys not supported
|
||||
{
|
||||
final TableMetadata table = TableBuilder.datasource("foo", "P1D")
|
||||
.property(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", true)))
|
||||
.build();
|
||||
assertThrows(
|
||||
Exception.class,
|
||||
() -> client.createTable(table, false)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
[
|
||||
{
|
||||
"query": "SELECT * FROM %%DATASOURCE%%",
|
||||
"expectedResults": [
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"bigint_col1": 8
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"bigint_col1": 8
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"bigint_col1": 9
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"bigint_col1": 10
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
|
@ -0,0 +1,23 @@
|
|||
[
|
||||
{
|
||||
"query": "SELECT * FROM %%DATASOURCE%%",
|
||||
"expectedResults": [
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"double_col1": 8.0
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"double_col1": 8.0
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"double_col1": 9.0
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"double_col1": 10.0
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
|
@ -0,0 +1,35 @@
|
|||
[
|
||||
{
|
||||
"query": "SELECT * FROM %%DATASOURCE%%",
|
||||
"expectedResults": [
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"bigint_col1": 8,
|
||||
"varchar_col2": "fop",
|
||||
"varchar_col1": "extra",
|
||||
"float_col1": 2.0
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"bigint_col1": 8,
|
||||
"varchar_col2": "foq",
|
||||
"varchar_col1": "extra",
|
||||
"float_col1": 2.0
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"bigint_col1": 9,
|
||||
"varchar_col2": "foo",
|
||||
"varchar_col1": "extra",
|
||||
"float_col1": 2.0
|
||||
},
|
||||
{
|
||||
"__time": 1672058096000,
|
||||
"bigint_col1": 10,
|
||||
"varchar_col2": "foo",
|
||||
"varchar_col1": "extra",
|
||||
"float_col1": 2.0
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
|
@ -121,14 +121,29 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
|
|||
* and returns the status associated with the submitted task
|
||||
*/
|
||||
public SqlTaskStatus submitMsqTaskSuccesfully(SqlQuery sqlQuery, String username, String password) throws ExecutionException, InterruptedException
|
||||
{
|
||||
return submitMsqTaskWithExpectedStatusCode(sqlQuery, username, password, HttpResponseStatus.ACCEPTED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
|
||||
* and returns the status associated with the submitted task
|
||||
*/
|
||||
public SqlTaskStatus submitMsqTaskWithExpectedStatusCode(
|
||||
SqlQuery sqlQuery,
|
||||
String username,
|
||||
String password,
|
||||
HttpResponseStatus expectedResponseStatus
|
||||
) throws ExecutionException, InterruptedException
|
||||
{
|
||||
StatusResponseHolder statusResponseHolder = submitMsqTask(sqlQuery, username, password);
|
||||
// Check if the task has been accepted successfully
|
||||
HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
|
||||
if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
|
||||
if (!httpResponseStatus.equals(expectedResponseStatus)) {
|
||||
throw new ISE(
|
||||
StringUtils.format(
|
||||
"Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
|
||||
"Expected response status code [%d] when submitting task. Received response status code [%d], and response content:\n[%s]",
|
||||
expectedResponseStatus.getCode(),
|
||||
httpResponseStatus.getCode(),
|
||||
statusResponseHolder.getContent()
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue