From ec46d82c710d3bb3481c0dace029729ba49aadd8 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 26 Jun 2020 10:32:42 -1000 Subject: [PATCH] Add integration tests for SqlInputSource (#10080) * Add integration tests for SqlInputSource * make it faster --- docs/ingestion/native-batch.md | 1 + integration-tests/docker/druid.sh | 10 ++ .../sql-input-source-sample-data.sql | 115 ++++++++++++++++++ .../parallelized/ITSqlInputSourceTest.java | 104 ++++++++++++++++ ...allel_index_using_sqlinputsource_task.json | 88 ++++++++++++++ 5 files changed, 318 insertions(+) create mode 100644 integration-tests/docker/test-data/sql-input-source-sample-data.sql create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_parallel_index_using_sqlinputsource_task.json diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index c366be0dff6..4dbbca7d582 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -1342,6 +1342,7 @@ An example SqlInputSource spec is shown below: } }, "sqls": ["SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"] + } }, ... ``` diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh index a7632b73892..0c06838b8e5 100755 --- a/integration-tests/docker/druid.sh +++ b/integration-tests/docker/druid.sh @@ -93,5 +93,15 @@ setupData() # The region of the sample data s3 blobs needed for these test groups export AWS_REGION=us-east-1 fi + + + # The SqlInputSource tests in the "batch-index" test group require data to be setup in MySQL before running the tests. + if [ "$DRUID_INTEGRATION_TEST_GROUP" = "batch-index" ] ; then + # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. + find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ + && echo "CREATE database sqlinputsource DEFAULT CHARACTER SET utf8mb4;" | mysql -u root druid \ + && cat /test-data/sql-input-source-sample-data.sql | mysql -u root druid \ + && /etc/init.d/mysql stop + fi } diff --git a/integration-tests/docker/test-data/sql-input-source-sample-data.sql b/integration-tests/docker/test-data/sql-input-source-sample-data.sql new file mode 100644 index 00000000000..201ab38e751 --- /dev/null +++ b/integration-tests/docker/test-data/sql-input-source-sample-data.sql @@ -0,0 +1,115 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +create table sqlinputsource.wikipedia_index_data1( + timestamp VARCHAR(100) NOT NULL, + page VARCHAR(100), + language VARCHAR(40), + user VARCHAR(100), + unpatrolled VARCHAR(100), + newPage VARCHAR(40), + robot VARCHAR(100), + anonymous VARCHAR(100), + namespace VARCHAR(40), + continent VARCHAR(100), + country VARCHAR(100), + region VARCHAR(40), + city VARCHAR(100), + added INT, + deleted INT, + delta INT +); + +create table sqlinputsource.wikipedia_index_data2( + timestamp VARCHAR(100) NOT NULL, + page VARCHAR(100), + language VARCHAR(40), + user VARCHAR(100), + unpatrolled VARCHAR(100), + newPage VARCHAR(40), + robot VARCHAR(100), + anonymous VARCHAR(100), + namespace VARCHAR(40), + continent VARCHAR(100), + country VARCHAR(100), + region VARCHAR(40), + city VARCHAR(100), + added INT, + deleted INT, + delta INT +); + +create table sqlinputsource.wikipedia_index_data3( + timestamp VARCHAR(100) NOT NULL, + page VARCHAR(100), + language VARCHAR(40), + user VARCHAR(100), + unpatrolled VARCHAR(100), + newPage VARCHAR(40), + robot VARCHAR(100), + anonymous VARCHAR(100), + namespace VARCHAR(40), + continent VARCHAR(100), + country VARCHAR(100), + region VARCHAR(40), + city VARCHAR(100), + added INT, + deleted INT, + delta INT +); + +create table sqlinputsource.wikipedia_index_data_all( + timestamp VARCHAR(100) NOT NULL, + page VARCHAR(100), + language VARCHAR(40), + user VARCHAR(100), + unpatrolled VARCHAR(100), + newPage VARCHAR(40), + robot VARCHAR(100), + anonymous VARCHAR(100), + namespace VARCHAR(40), + continent VARCHAR(100), + country VARCHAR(100), + region VARCHAR(40), + city VARCHAR(100), + added INT, + deleted INT, + delta INT +); + +INSERT INTO sqlinputsource.wikipedia_index_data1 VALUES ("2013-08-31T01:02:33Z", "Gypsy Danger", "en", "nuclear", "true", "true", "false", "false", "article", "North America", "United States", "Bay Area", "San Francisco", 57, 200, -143); +INSERT INTO sqlinputsource.wikipedia_index_data1 VALUES ("2013-08-31T03:32:45Z", "Striker Eureka", "en", "speed", "false", "true", "true", "false", "wikipedia", "Australia", "Australia", "Cantebury", "Syndey", 459, 129, 330); +INSERT INTO sqlinputsource.wikipedia_index_data1 VALUES ("2013-08-31T07:11:21Z", "Cherno Alpha", "ru", "masterYi", "false", "true", "true", "false", "article", "Asia", "Russia", "Oblast", "Moscow", 123, 12, 111); + +INSERT INTO sqlinputsource.wikipedia_index_data2 VALUES ("2013-08-31T11:58:39Z", "Crimson Typhoon", "zh", "triplets", "true", "false", "true", "false", "wikipedia", "Asia", "China", "Shanxi", "Taiyuan", 905, 5, 900); +INSERT INTO sqlinputsource.wikipedia_index_data2 VALUES ("2013-08-31T12:41:27Z", "Coyote Tango", "ja", "stringer", "true", "false", "true", "false", "wikipedia", "Asia", "Japan", "Kanto", "Tokyo", 1, 10, -9); +INSERT INTO sqlinputsource.wikipedia_index_data2 VALUES ("2013-09-01T01:02:33Z", "Gypsy Danger", "en", "nuclear", "true", "true", "false", "false", "article", "North America", "United States", "Bay Area", "San Francisco", 57, 200, -143); + +INSERT INTO sqlinputsource.wikipedia_index_data3 VALUES ("2013-09-01T03:32:45Z", "Striker Eureka", "en", "speed", "false", "true", "true", "false", "wikipedia", "Australia", "Australia", "Cantebury", "Syndey", 459, 129, 330); +INSERT INTO sqlinputsource.wikipedia_index_data3 VALUES ("2013-09-01T07:11:21Z", "Cherno Alpha", "ru", "masterYi", "false", "true", "true", "false", "article", "Asia", "Russia", "Oblast", "Moscow", 123, 12, 111); +INSERT INTO sqlinputsource.wikipedia_index_data3 VALUES ("2013-09-01T11:58:39Z", "Crimson Typhoon", "zh", "triplets", "true", "false", "true", "false", "wikipedia", "Asia", "China", "Shanxi", "Taiyuan", 905, 5, 900); +INSERT INTO sqlinputsource.wikipedia_index_data3 VALUES ("2013-09-01T12:41:27Z", "Coyote Tango", "ja", "stringer", "true", "false", "true", "false", "wikipedia", "Asia", "Japan", "Kanto", "Tokyo", 1, 10, -9); + +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-08-31T01:02:33Z", "Gypsy Danger", "en", "nuclear", "true", "true", "false", "false", "article", "North America", "United States", "Bay Area", "San Francisco", 57, 200, -143); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-08-31T03:32:45Z", "Striker Eureka", "en", "speed", "false", "true", "true", "false", "wikipedia", "Australia", "Australia", "Cantebury", "Syndey", 459, 129, 330); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-08-31T07:11:21Z", "Cherno Alpha", "ru", "masterYi", "false", "true", "true", "false", "article", "Asia", "Russia", "Oblast", "Moscow", 123, 12, 111); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-08-31T11:58:39Z", "Crimson Typhoon", "zh", "triplets", "true", "false", "true", "false", "wikipedia", "Asia", "China", "Shanxi", "Taiyuan", 905, 5, 900); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-08-31T12:41:27Z", "Coyote Tango", "ja", "stringer", "true", "false", "true", "false", "wikipedia", "Asia", "Japan", "Kanto", "Tokyo", 1, 10, -9); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-09-01T01:02:33Z", "Gypsy Danger", "en", "nuclear", "true", "true", "false", "false", "article", "North America", "United States", "Bay Area", "San Francisco", 57, 200, -143); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-09-01T03:32:45Z", "Striker Eureka", "en", "speed", "false", "true", "true", "false", "wikipedia", "Australia", "Australia", "Cantebury", "Syndey", 459, 129, 330); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-09-01T07:11:21Z", "Cherno Alpha", "ru", "masterYi", "false", "true", "true", "false", "article", "Asia", "Russia", "Oblast", "Moscow", 123, 12, 111); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-09-01T11:58:39Z", "Crimson Typhoon", "zh", "triplets", "true", "false", "true", "false", "wikipedia", "Asia", "China", "Shanxi", "Taiyuan", 905, 5, 900); +INSERT INTO sqlinputsource.wikipedia_index_data_all VALUES ("2013-09-01T12:41:27Z", "Coyote Tango", "ja", "stringer", "true", "false", "true", "false", "wikipedia", "Asia", "Japan", "Kanto", "Tokyo", 1, 10, -9); + diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java new file mode 100644 index 00000000000..4efe83afcf2 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.parallelized; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractITBatchIndexTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.util.List; +import java.util.UUID; +import java.util.function.Function; + +@Test(groups = TestNGGroup.BATCH_INDEX) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITSqlInputSourceTest extends AbstractITBatchIndexTest +{ + private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + + @DataProvider(parallel = true) + public static Object[][] resources() + { + return new Object[][]{ + // Multiple query. No filter + {ImmutableList.of("SELECT * FROM wikipedia_index_data1", "SELECT * FROM wikipedia_index_data2", "SELECT * FROM wikipedia_index_data3")}, + // Multiple query. Filter on timestamp column + {ImmutableList.of("SELECT * FROM wikipedia_index_data1 WHERE timestamp BETWEEN '2013-08-31 00:00:00' AND '2013-08-31 11:59:59'", + "SELECT * FROM wikipedia_index_data2 WHERE timestamp BETWEEN '2013-08-31 00:00:00' AND '2013-09-01 11:59:59'", + "SELECT * FROM wikipedia_index_data3 WHERE timestamp BETWEEN '2013-09-01 00:00:00' AND '2013-09-01 11:59:59'")}, + // Multiple query. Filter on data column + {ImmutableList.of("SELECT * FROM wikipedia_index_data1 WHERE added > 0", + "SELECT * FROM wikipedia_index_data2 WHERE added > 0", + "SELECT * FROM wikipedia_index_data3 WHERE added > 0")}, + // Single query. No filter + {ImmutableList.of("SELECT * FROM wikipedia_index_data_all")}, + // Single query. Filter on timestamp column + {ImmutableList.of("SELECT * FROM wikipedia_index_data_all WHERE timestamp BETWEEN '2013-08-31 00:00:00' AND '2013-09-01 11:59:59'")}, + // Single query. Filter on data column + {ImmutableList.of("SELECT * FROM wikipedia_index_data_all WHERE added > 0")}, + }; + } + + @Test(dataProvider = "resources") + public void testIndexData(List sqlQueries) throws Exception + { + final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); + try ( + final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix()); + ) { + final Function sqlInputSourcePropsTransform = spec -> { + try { + spec = StringUtils.replace( + spec, + "%%PARTITIONS_SPEC%%", + jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null)) + ); + return StringUtils.replace( + spec, + "%%SQL_QUERY%%", + jsonMapper.writeValueAsString(sqlQueries) + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + doIndexTest( + indexDatasource, + INDEX_TASK, + sqlInputSourcePropsTransform, + INDEX_QUERIES_RESOURCE, + false, + true, + true + ); + } + + } +} diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_using_sqlinputsource_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_using_sqlinputsource_task.json new file mode 100644 index 00000000000..b6b0d54c2aa --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_using_sqlinputsource_task.json @@ -0,0 +1,88 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + {"type": "string", "name": "language", "createBitmapIndex": false}, + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "name": "thetaSketch", + "type": "thetaSketch", + "fieldName": "user" + }, + { + "name": "quantilesDoublesSketch", + "type": "quantilesDoublesSketch", + "fieldName": "delta" + }, + { + "name": "HLLSketchBuild", + "type": "HLLSketchBuild", + "fieldName": "user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals" : [ "2013-08-31/2013-09-02" ] + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "sql", + "database": { + "type": "mysql", + "connectorConfig": { + "connectURI": "jdbc:mysql://druid-metadata-storage/sqlinputsource", + "user": "druid", + "password": "diurd" + } + }, + "sqls": %%SQL_QUERY%% + } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumConcurrentSubTasks": 10, + "partitionsSpec": %%PARTITIONS_SPEC%% + } + } +} \ No newline at end of file