From 5172d76a6720a217c321e90b9294435ca60242ea Mon Sep 17 00:00:00 2001
From: abhagraw <99210446+abhagraw@users.noreply.github.com>
Date: Mon, 21 Nov 2022 09:12:02 +0530
Subject: [PATCH] Migrate current integration batch tests to equivalent MSQ
tests (#13374)
* Migrate current integration batch tests to equivalent MSQ tests using new IT framework
* Fix build issues
* Trigger Build
* Adding more tests and addressing comments
* fixBuildIssues
* fix dependency issues
* Parameterized the test and addressed comments
* Addressing comments
* fixing checkstyle errors
* Adressing comments
---
.travis.yml | 16 +-
distribution/pom.xml | 2 +
integration-tests-ex/cases/pom.xml | 19 ++-
.../msq/AbstractITSQLBasedIngestion.java | 121 ++++++++++++++
.../testsEx/msq/ITSQLBasedBatchIngestion.java | 71 +++++++++
.../batch-index/json_path_index_queries.json | 49 ++++++
.../batch-index/msq_inline.sql | 17 ++
.../batch-index/sparse_column_msq.json | 93 +++++++++++
.../batch-index/sparse_column_msq.sql | 21 +++
.../wikipedia_http_inputsource_msq.sql | 29 ++++
.../wikipedia_http_inputsource_queries.json | 47 ++++++
.../batch-index/wikipedia_index_msq.sql | 32 ++++
.../batch-index/wikipedia_index_queries.json | 150 ++++++++++++++++++
...ikipedia_index_queries_with_transform.json | 62 ++++++++
.../wikipedia_index_task_with_transform.sql | 32 ++++
.../wikipedia_merge_index_task.sql | 33 ++++
.../testing/utils/MsqTestQueryHelper.java | 31 +++-
17 files changed, 808 insertions(+), 17 deletions(-)
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
diff --git a/.travis.yml b/.travis.yml
index 2fd32ffd7b9..2d785486ac9 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -442,10 +442,6 @@ jobs:
docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
done
- - <<: *integration_batch_index
- name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer"
- env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
-
- &integration_input_format
name: "(Compile=openjdk8, Run=openjdk8) input format integration test"
stage: Tests - phase 2
@@ -689,11 +685,13 @@ jobs:
env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: ./it.sh travis Catalog
- # Disabling BatchIndex test as it is failing with due to timeout, fixing it will be taken in a separate PR.
- #- <<: *integration_tests_ex
- # name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)"
- # env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- # script: ./it.sh travis BatchIndex
+ - &integration_tests_ex
+ name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)"
+ stage: Tests - phase 2
+ jdk: openjdk8
+ services: *integration_test_services
+ env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
+ script: ./it.sh travis BatchIndex
# END - Integration tests for Compile with Java 8 and Run with Java 8
diff --git a/distribution/pom.xml b/distribution/pom.xml
index b73e4215a64..e2b7773b09d 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -225,6 +225,8 @@
-c
org.apache.druid.extensions:druid-multi-stage-query
-c
+ org.apache.druid.extensions:druid-catalog
+ -c
org.apache.druid.extensions:druid-protobuf-extensions
-c
org.apache.druid.extensions:mysql-metadata-storage
diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml
index 5456d4b81b0..cf781f6f888 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -155,7 +155,7 @@
org.jdbi
jdbi
-
+
org.apache.druid.extensions
mysql-metadata-storage
${project.parent.version}
@@ -218,10 +218,15 @@
JUnitParams
test
-
- javax.ws.rs
- jsr311-api
-
+
+ javax.ws.rs
+ jsr311-api
+
+
+ org.apache.curator
+ curator-client
+ 5.4.0
+
@@ -345,8 +350,8 @@
integration-test
-
- true
+
+ False
org.apache.druid.testsEx.categories.${it.category}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
new file mode 100644
index 00000000000..4bb1cdc4783
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.google.inject.Inject;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testing.utils.TestQueryHelper;
+import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class AbstractITSQLBasedIngestion
+{
+ public static final Logger LOG = new Logger(TestQueryHelper.class);
+ @Inject
+ private MsqTestQueryHelper msqHelper;
+
+ @Inject
+ protected TestQueryHelper queryHelper;
+
+ @Inject
+ private DataLoaderHelper dataLoaderHelper;
+
+ /**
+ * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide datasource value.
+ */
+ protected String getStringFromFileAndReplaceDatasource(String filePath, String datasource)
+ {
+ String fileString;
+ try {
+ InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(filePath);
+ fileString = IOUtils.toString(is, StandardCharsets.UTF_8);
+ }
+ catch (IOException e) {
+ throw new ISE(e, "could not read query file: %s", filePath);
+ }
+
+ fileString = StringUtils.replace(
+ fileString,
+ "%%DATASOURCE%%",
+ datasource
+ );
+
+ return fileString;
+ }
+
+ /**
+ * Reads native queries from a file and runs against the provided datasource.
+ */
+ protected void doTestQuery(String queryFilePath, String dataSource)
+ {
+ try {
+ String query = getStringFromFileAndReplaceDatasource(queryFilePath, dataSource);
+ queryHelper.testQueriesFromString(query);
+ }
+ catch (Exception e) {
+ LOG.error(e, "Error while running test query");
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Sumits a sqlTask, waits for task completion and then runs test queries on ingested datasource.
+ */
+ protected void submitTaskAnddoTestQuery(String sqlTask, String queryFilePath, String datasource,
+ Map msqContext) throws Exception
+ {
+ LOG.info("SqlTask - \n %s", sqlTask);
+
+ // Submit the tasks and wait for the datasource to get loaded
+ msqHelper.submitMsqTaskAndWaitForCompletion(
+ sqlTask,
+ msqContext
+ );
+
+ dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+ doTestQuery(queryFilePath, datasource);
+ }
+
+ /**
+ * Runs a MSQ ingest sql test.
+ *
+ * @param sqlFilePath path of file containing the sql query.
+ * @param queryFilePath path of file containing the native test queries to be run on the ingested datasource.
+ * @param datasource name of the datasource. %%DATASOURCE%% in the sql and queries will be replaced with this value.
+ * @param msqContext context parameters to be passed with MSQ API call.
+ */
+ protected void runMSQTaskandTestQueries(String sqlFilePath, String queryFilePath, String datasource,
+ Map msqContext) throws Exception
+ {
+ LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath);
+
+ String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, datasource);
+ submitTaskAnddoTestQuery(sqlTask, queryFilePath, datasource, msqContext);
+ }
+}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
new file mode 100644
index 00000000000..dbc26d7d408
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import junitparams.Parameters;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedIngestion
+{
+ private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/";
+
+ public static List> test_cases()
+ {
+ return Arrays.asList(
+ Arrays.asList("msq_inline.sql", "json_path_index_queries.json"),
+ Arrays.asList("sparse_column_msq.sql", "sparse_column_msq.json"),
+ Arrays.asList("wikipedia_http_inputsource_msq.sql", "wikipedia_http_inputsource_queries.json"),
+ Arrays.asList("wikipedia_index_msq.sql", "wikipedia_index_queries.json"),
+ Arrays.asList("wikipedia_merge_index_task.sql", "wikipedia_index_queries.json"),
+ Arrays.asList("wikipedia_index_task_with_transform.sql", "wikipedia_index_queries_with_transform.json")
+ );
+
+ }
+
+ @Test
+ @Parameters(method = "test_cases")
+ public void testSQLBasedBatchIngestion(String sqlFileName, String queryFileName)
+ {
+ try {
+ runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIR + sqlFileName,
+ BATCH_INDEX_TASKS_DIR + queryFileName,
+ FilenameUtils.removeExtension(sqlFileName),
+ ImmutableMap.of("finalizeAggregations", false,
+ "maxNumTasks", 5,
+ "groupByEnableMultiValueUnnesting", false
+ ));
+ }
+ catch (Exception e) {
+ LOG.error(e, "Error while testing [%s, %s]", sqlFileName, queryFileName);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
new file mode 100644
index 00000000000..845af00dd88
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
@@ -0,0 +1,49 @@
+[
+ {
+ "description": "timeseries",
+ "query": {
+ "queryType": "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "intervals": [
+ "1000/3000"
+ ],
+ "aggregations": [
+ {
+ "type": "longSum",
+ "name": "len",
+ "fieldName": "len"
+ },
+ {
+ "type": "longSum",
+ "name": "max",
+ "fieldName": "max"
+ },
+ {
+ "type": "longSum",
+ "name": "min",
+ "fieldName": "min"
+ },
+ {
+ "type": "longSum",
+ "name": "sum",
+ "fieldName": "sum"
+ }
+ ],
+ "granularity": {
+ "type": "all"
+ }
+ },
+ "expectedResults": [
+ {
+ "timestamp": "2013-08-31T01:02:33.000Z",
+ "result": {
+ "sum": 10,
+ "min": 0,
+ "len": 5,
+ "max": 4
+ }
+ }
+ ]
+ }
+]
+
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
new file mode 100644
index 00000000000..a710691574a
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
@@ -0,0 +1,17 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+ '{"type":"inline","data":"{\"timestamp\": \"2013-08-31T01:02:33Z\", \"values\": [0,1,2,3,4] }"}',
+ '{"type":"json","flattenSpec":{"useFieldDiscovery":true,"fields":[{"type":"path","name":"len","expr":"$.values.length()"},{"type":"path","name":"min","expr":"$.values.min()"},{"type":"path","name":"max","expr":"$.values.max()"},{"type":"path","name":"sum","expr":"$.values.sum()"}]}}',
+ '[{"name":"timestamp","type":"string"},{"name":"len","type":"long"},{"name":"min","type":"long"},{"name":"max","type":"long"},{"name":"sum","type":"long"}]'
+ )
+))
+SELECT
+ TIME_PARSE("timestamp") AS __time,
+ "len",
+ "min",
+ "max",
+ "sum"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5
+PARTITIONED BY HOUR
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
new file mode 100644
index 00000000000..4c2c5aa2950
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
@@ -0,0 +1,93 @@
+[
+ {
+ "description": "timeseries, 1 agg, all",
+ "query":{
+ "queryType" : "timeBoundary",
+ "dataSource": "%%DATASOURCE%%"
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2015-09-12T00:00:00.000Z",
+ "result" : {
+ "minTime" : "2015-09-12T00:00:00.000Z",
+ "maxTime" : "2015-09-12T00:00:00.000Z"
+ }
+ }
+ ]
+ },
+ {
+ "description": "scan, all",
+ "query": {
+ "queryType": "scan",
+ "dataSource": "%%DATASOURCE%%",
+ "intervals": [
+ "2013-01-01/2020-01-02"
+ ],
+ "resultFormat":"compactedList"
+ },
+ "expectedResults": [{
+ "segmentId":"dstsparse_column_msq.json_2015-09-12T00:00:00.000Z_2015-09-12T01:00:00.000Z_2022-11-17T12:32:11.247Z",
+ "columns":["__time","dimB","dimA","dimC","dimD","dimE","dimF","count","sum_metA"],
+ "events":[
+ [1442016000000,"F","C",null,null,null,null,1,1],
+ [1442016000000,"J","C",null,null,null,null,1,1],
+ [1442016000000,"R","J",null,null,null,null,1,1],
+ [1442016000000,"S","Z",null,null,null,null,1,1],
+ [1442016000000,"T","H",null,null,null,null,1,1],
+ [1442016000000,"X",null,"A",null,null,null,1,1],
+ [1442016000000,"X","H",null,null,null,null,3,3],
+ [1442016000000,"Z","H",null,null,null,null,1,1]
+ ]
+ }],
+ "fieldsToTest": ["events"]
+ },
+ {
+ "description": "roll up ratio",
+ "query": {
+ "queryType":"timeseries",
+ "dataSource":{
+ "type":"table",
+ "name":"%%DATASOURCE%%"
+ },
+ "intervals":{
+ "type":"intervals",
+ "intervals":[
+ "2013-01-01/2020-01-02"
+ ]
+ },
+ "granularity":{
+ "type":"all"
+ },
+ "aggregations":[
+ {
+ "type":"count",
+ "name":"a0"
+ },
+ {
+ "type":"longSum",
+ "name":"a1",
+ "fieldName":"count",
+ "expression":null
+ }
+ ],
+ "postAggregations":[
+ {
+ "type":"expression",
+ "name":"p0",
+ "expression":"((\"a0\" * 1.00) / \"a1\")",
+ "ordering":null
+ }
+ ]
+ },
+ "expectedResults": [
+ {
+ "timestamp" : "2015-09-12T00:00:00.000Z",
+ "result" : {
+ "a1" : 10,
+ "p0" : 0.8,
+ "a0" : 8
+ }
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
new file mode 100644
index 00000000000..f844f599646
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
@@ -0,0 +1,21 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+ '{"type":"inline","data":"{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"F\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"J\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"Z\",\"dimB\":\"S\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"Z\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"J\",\"dimB\":\"R\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"T\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimC\":\"A\",\"dimB\":\"X\",\"metA\":1}\n"}',
+ '{"type":"json"}',
+ '[{"name":"time","type":"string"},{"name":"dimB","type":"string"},{"name":"dimA","type":"string"},{"name":"dimC","type":"string"},{"name":"dimD","type":"string"},{"name":"dimE","type":"string"},{"name":"dimF","type":"string"},{"name":"metA","type":"long"}]'
+ )
+))
+SELECT
+ TIME_FLOOR(TIME_PARSE("time"), 'PT1H') AS __time,
+ "dimB",
+ "dimA",
+ "dimC",
+ "dimD",
+ "dimE",
+ "dimF",
+ COUNT(*) AS "count",
+ SUM("metA") AS "sum_metA"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7
+PARTITIONED BY HOUR
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
new file mode 100644
index 00000000000..f1af33bed43
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
@@ -0,0 +1,29 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+ '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz","https://druid.apache.org/data/wikipedia.json.gz"]}',
+ '{"type":"json"}',
+ '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
+ )
+))
+SELECT
+ TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time,
+ "page",
+ "language",
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ COUNT(*) AS "count",
+ SUM("added") AS "added",
+ SUM("deleted") AS "deleted",
+ SUM("delta") AS "delta"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
+PARTITIONED BY DAY
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
new file mode 100644
index 00000000000..2d454d59d80
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
@@ -0,0 +1,47 @@
+[
+ {
+ "description": "timeseries, 1 agg, all",
+ "query":{
+ "queryType" : "timeBoundary",
+ "dataSource": "%%DATASOURCE%%"
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2016-06-27T00:00:11.000Z",
+ "result" : {
+ "minTime" : "2016-06-27T00:00:11.000Z",
+ "maxTime" : "2016-06-27T21:31:02.000Z"
+ }
+ }
+ ]
+ },
+ {
+ "description": "simple aggr",
+ "query":{
+ "queryType" : "topN",
+ "dataSource" : "%%DATASOURCE%%",
+ "intervals" : ["2016-06-27/2016-06-28"],
+ "granularity" : "all",
+ "dimension" : "page",
+ "metric" : "count",
+ "threshold" : 3,
+ "aggregations" : [
+ {
+ "type" : "count",
+ "name" : "count"
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2016-06-27T00:00:11.000Z",
+ "result" :
+ [
+ {"count":29,"page":"Copa América Centenario"},
+ {"count":16,"page":"User:Cyde/List of candidates for speedy deletion/Subpage"},
+ {"count":16,"page":"Wikipedia:Administrators' noticeboard/Incidents"}
+ ]
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
new file mode 100644
index 00000000000..738e39fb870
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
@@ -0,0 +1,32 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" as (SELECT * FROM TABLE(
+ EXTERN(
+ '{"type":"local","files":["/resources/data/batch_index/json/wikipedia_index_data1.json","/resources/data/batch_index/json/wikipedia_index_data2.json","/resources/data/batch_index/json/wikipedia_index_data3.json"]}',
+ '{"type":"json"}',
+ '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
+ )
+))
+SELECT
+ TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time,
+ "page",
+ "language",
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ COUNT(*) AS "count",
+ SUM("added") AS "added",
+ SUM("deleted") AS "deleted",
+ SUM("delta") AS "delta",
+ APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+ DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+ APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
+PARTITIONED BY DAY
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
new file mode 100644
index 00000000000..928effe65e9
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
@@ -0,0 +1,150 @@
+[
+ {
+ "description": "timeseries, 1 agg, all",
+ "query":{
+ "queryType" : "timeBoundary",
+ "dataSource": "%%DATASOURCE%%"
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T01:02:33.000Z",
+ "result" : {
+ "minTime" : "2013-08-31T01:02:33.000Z",
+ "maxTime" : "2013-09-01T12:41:27.000Z"
+ }
+ }
+ ]
+ },
+ {
+ "description": "timeseries, datasketch aggs, all",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"day",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ],
+ "filter":null,
+ "aggregations":[
+ {
+ "type": "HLLSketchMerge",
+ "name": "approxCountHLL",
+ "fieldName": "HLLSketchBuild",
+ "lgK": 12,
+ "tgtHllType": "HLL_4",
+ "round": true
+ },
+ {
+ "type":"thetaSketch",
+ "name":"approxCountTheta",
+ "fieldName":"thetaSketch",
+ "size":16384,
+ "shouldFinalize":true,
+ "isInputThetaSketch":false,
+ "errorBoundsStdDev":null
+ },
+ {
+ "type":"quantilesDoublesSketch",
+ "name":"quantilesSketch",
+ "fieldName":"quantilesDoublesSketch",
+ "k":128
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "result" : {
+ "quantilesSketch":5,
+ "approxCountTheta":5.0,
+ "approxCountHLL":5
+ }
+ }
+ ]
+ },
+ {
+ "description":"having spec on post aggregation",
+ "query":{
+ "queryType":"groupBy",
+ "dataSource":"%%DATASOURCE%%",
+ "granularity":"day",
+ "dimensions":[
+ "page"
+ ],
+ "filter":{
+ "type":"selector",
+ "dimension":"language",
+ "value":"zh"
+ },
+ "aggregations":[
+ {
+ "type":"count",
+ "name":"rows"
+ },
+ {
+ "type":"longSum",
+ "fieldName":"added",
+ "name":"added_count"
+ }
+ ],
+ "postAggregations": [
+ {
+ "type":"arithmetic",
+ "name":"added_count_times_ten",
+ "fn":"*",
+ "fields":[
+ {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
+ {"type":"constant", "name":"const", "value":10}
+ ]
+ }
+ ],
+ "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ]
+ },
+ "expectedResults":[ {
+ "version" : "v1",
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "event" : {
+ "added_count_times_ten" : 9050.0,
+ "page" : "Crimson Typhoon",
+ "added_count" : 905,
+ "rows" : 1
+ }
+ } ]
+ },
+ {
+ "description": "timeseries, stringFirst/stringLast aggs, all",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"day",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ],
+ "filter":null,
+ "aggregations":[
+ {
+ "type": "stringFirst",
+ "name": "first_user",
+ "fieldName": "user"
+ },
+ {
+ "type":"stringLast",
+ "name":"last_user",
+ "fieldName":"user"
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "result" : {
+ "first_user":"nuclear",
+ "last_user":"stringer"
+ }
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
new file mode 100644
index 00000000000..f0cfba67735
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
@@ -0,0 +1,62 @@
+[
+ {
+ "description":"having spec on post aggregation",
+ "query":{
+ "queryType":"groupBy",
+ "dataSource":"%%DATASOURCE%%",
+ "granularity":"day",
+ "dimensions":[
+ "page",
+ "city"
+ ],
+ "filter":{
+ "type":"selector",
+ "dimension":"language",
+ "value":"language-zh"
+ },
+ "aggregations":[
+ {
+ "type":"count",
+ "name":"rows"
+ },
+ {
+ "type":"longSum",
+ "fieldName":"triple-added",
+ "name":"added_count"
+ },
+ {
+ "type":"longSum",
+ "fieldName":"delta",
+ "name":"delta_sum"
+ }
+ ],
+ "postAggregations": [
+ {
+ "type":"arithmetic",
+ "name":"added_count_times_ten",
+ "fn":"*",
+ "fields":[
+ {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
+ {"type":"constant", "name":"const", "value":10}
+ ]
+ }
+ ],
+ "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ]
+ },
+ "expectedResults":[ {
+ "version" : "v1",
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "event" : {
+ "added_count_times_ten" : 27150.0,
+ "page" : "Crimson Typhoon",
+ "city" : "Taiyuan",
+ "added_count" : 2715,
+ "delta_sum" : 900,
+ "rows" : 1
+ }
+ } ]
+ }
+]
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
new file mode 100644
index 00000000000..ebdeeda6893
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
@@ -0,0 +1,32 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+ '{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}',
+ '{"type":"json"}',
+ '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"triple-added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
+ )
+))
+SELECT
+ TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time,
+ "page",
+ concat('language-', "language") AS "language",
+ "user",
+ "unpatrolled",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ COUNT(*) AS "count",
+ SUM("added") AS "added",
+ SUM("added")*3 AS "triple-added",
+ SUM("deleted") AS "deleted",
+ SUM("delta") AS "delta",
+ APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+ DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+ APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
+PARTITIONED BY DAY
\ No newline at end of file
diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
new file mode 100644
index 00000000000..a8160aa9055
--- /dev/null
+++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
@@ -0,0 +1,33 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+ '{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}',
+ '{"type":"json"}',
+ '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
+ )
+))
+SELECT
+ TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time,
+ "page",
+ "language",
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ "timestamp",
+ COUNT(*) AS "count",
+ SUM("added") AS "added",
+ SUM("deleted") AS "deleted",
+ SUM("delta") AS "delta",
+ APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+ DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+ APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
+PARTITIONED BY DAY
\ No newline at end of file
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
index 424d070529f..37647fde87d 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
@@ -43,6 +43,7 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.testng.Assert;
import java.util.ArrayList;
import java.util.Collections;
@@ -95,7 +96,15 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper context) throws ExecutionException, InterruptedException
+ {
+ return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, context, null));
}
// Run the task, wait for it to complete, fetch the reports, verify the results,
@@ -154,6 +163,7 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper t instanceof TaskStillRunningException,
+ 99,
100
);
}
@@ -250,6 +260,25 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper context)
+ throws Exception
+ {
+ SqlTaskStatus sqlTaskStatus = submitMsqTask(sqlQueryString, context);
+
+ LOG.info("Sql Task submitted with task Id - %s", sqlTaskStatus.getTaskId());
+
+ if (sqlTaskStatus.getState().isFailure()) {
+ Assert.fail(StringUtils.format(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ sqlTaskStatus.getError()
+ ));
+ }
+ pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
+ }
+
private static class TaskStillRunningException extends Exception
{