Migrate current integration batch tests to equivalent MSQ tests (#13374)

* Migrate current integration batch tests to equivalent MSQ tests using new IT framework

* Fix build issues

* Trigger Build

* Adding more tests and addressing comments

* fixBuildIssues

* fix dependency issues

* Parameterized the test and addressed comments

* Addressing comments

* fixing checkstyle errors

* Adressing comments
This commit is contained in:
abhagraw 2022-11-21 09:12:02 +05:30 committed by GitHub
parent a860baf496
commit 5172d76a67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 808 additions and 17 deletions

View File

@ -442,10 +442,6 @@ jobs:
docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
done
- <<: *integration_batch_index
name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- &integration_input_format
name: "(Compile=openjdk8, Run=openjdk8) input format integration test"
stage: Tests - phase 2
@ -689,11 +685,13 @@ jobs:
env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: ./it.sh travis Catalog
# Disabling BatchIndex test as it is failing with due to timeout, fixing it will be taken in a separate PR.
#- <<: *integration_tests_ex
# name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)"
# env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
# script: ./it.sh travis BatchIndex
- &integration_tests_ex
name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)"
stage: Tests - phase 2
jdk: openjdk8
services: *integration_test_services
env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
script: ./it.sh travis BatchIndex
# END - Integration tests for Compile with Java 8 and Run with Java 8

View File

@ -225,6 +225,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-multi-stage-query</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-catalog</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-protobuf-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:mysql-metadata-storage</argument>

View File

@ -155,7 +155,7 @@
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>mysql-metadata-storage</artifactId>
<version>${project.parent.version}</version>
@ -218,10 +218,15 @@
<artifactId>JUnitParams</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.4.0</version>
</dependency>
</dependencies>
<!-- Exclude ITs from surefire. Required because they end with "Test". -->
@ -345,8 +350,8 @@
<goal>integration-test</goal>
</goals>
<configuration>
<!-- our tests are very verbose, let's keep the volume down -->
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<!-- Turning on logs so that travis does not time out tests for not providing any output. -->
<redirectTestOutputToFile>False</redirectTestOutputToFile>
<!-- Can run only one test category per Maven run. -->
<groups>org.apache.druid.testsEx.categories.${it.category}</groups>
</configuration>

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.msq;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class AbstractITSQLBasedIngestion
{
public static final Logger LOG = new Logger(TestQueryHelper.class);
@Inject
private MsqTestQueryHelper msqHelper;
@Inject
protected TestQueryHelper queryHelper;
@Inject
private DataLoaderHelper dataLoaderHelper;
/**
* Reads file as utf-8 string and replace %%DATASOURCE%% with the provide datasource value.
*/
protected String getStringFromFileAndReplaceDatasource(String filePath, String datasource)
{
String fileString;
try {
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(filePath);
fileString = IOUtils.toString(is, StandardCharsets.UTF_8);
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", filePath);
}
fileString = StringUtils.replace(
fileString,
"%%DATASOURCE%%",
datasource
);
return fileString;
}
/**
* Reads native queries from a file and runs against the provided datasource.
*/
protected void doTestQuery(String queryFilePath, String dataSource)
{
try {
String query = getStringFromFileAndReplaceDatasource(queryFilePath, dataSource);
queryHelper.testQueriesFromString(query);
}
catch (Exception e) {
LOG.error(e, "Error while running test query");
throw new RuntimeException(e);
}
}
/**
* Sumits a sqlTask, waits for task completion and then runs test queries on ingested datasource.
*/
protected void submitTaskAnddoTestQuery(String sqlTask, String queryFilePath, String datasource,
Map<String, Object> msqContext) throws Exception
{
LOG.info("SqlTask - \n %s", sqlTask);
// Submit the tasks and wait for the datasource to get loaded
msqHelper.submitMsqTaskAndWaitForCompletion(
sqlTask,
msqContext
);
dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
doTestQuery(queryFilePath, datasource);
}
/**
* Runs a MSQ ingest sql test.
*
* @param sqlFilePath path of file containing the sql query.
* @param queryFilePath path of file containing the native test queries to be run on the ingested datasource.
* @param datasource name of the datasource. %%DATASOURCE%% in the sql and queries will be replaced with this value.
* @param msqContext context parameters to be passed with MSQ API call.
*/
protected void runMSQTaskandTestQueries(String sqlFilePath, String queryFilePath, String datasource,
Map<String, Object> msqContext) throws Exception
{
LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath);
String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, datasource);
submitTaskAnddoTestQuery(sqlTask, queryFilePath, datasource, msqContext);
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.msq;
import junitparams.Parameters;
import org.apache.commons.io.FilenameUtils;
import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
import org.apache.druid.testsEx.categories.MultiStageQuery;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.util.Arrays;
import java.util.List;
@RunWith(DruidTestRunner.class)
@Category(MultiStageQuery.class)
public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedIngestion
{
private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/";
public static List<List<String>> test_cases()
{
return Arrays.asList(
Arrays.asList("msq_inline.sql", "json_path_index_queries.json"),
Arrays.asList("sparse_column_msq.sql", "sparse_column_msq.json"),
Arrays.asList("wikipedia_http_inputsource_msq.sql", "wikipedia_http_inputsource_queries.json"),
Arrays.asList("wikipedia_index_msq.sql", "wikipedia_index_queries.json"),
Arrays.asList("wikipedia_merge_index_task.sql", "wikipedia_index_queries.json"),
Arrays.asList("wikipedia_index_task_with_transform.sql", "wikipedia_index_queries_with_transform.json")
);
}
@Test
@Parameters(method = "test_cases")
public void testSQLBasedBatchIngestion(String sqlFileName, String queryFileName)
{
try {
runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIR + sqlFileName,
BATCH_INDEX_TASKS_DIR + queryFileName,
FilenameUtils.removeExtension(sqlFileName),
ImmutableMap.of("finalizeAggregations", false,
"maxNumTasks", 5,
"groupByEnableMultiValueUnnesting", false
));
}
catch (Exception e) {
LOG.error(e, "Error while testing [%s, %s]", sqlFileName, queryFileName);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,49 @@
[
{
"description": "timeseries",
"query": {
"queryType": "timeseries",
"dataSource": "%%DATASOURCE%%",
"intervals": [
"1000/3000"
],
"aggregations": [
{
"type": "longSum",
"name": "len",
"fieldName": "len"
},
{
"type": "longSum",
"name": "max",
"fieldName": "max"
},
{
"type": "longSum",
"name": "min",
"fieldName": "min"
},
{
"type": "longSum",
"name": "sum",
"fieldName": "sum"
}
],
"granularity": {
"type": "all"
}
},
"expectedResults": [
{
"timestamp": "2013-08-31T01:02:33.000Z",
"result": {
"sum": 10,
"min": 0,
"len": 5,
"max": 4
}
}
]
}
]

View File

@ -0,0 +1,17 @@
REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
WITH "source" AS (SELECT * FROM TABLE(
EXTERN(
'{"type":"inline","data":"{\"timestamp\": \"2013-08-31T01:02:33Z\", \"values\": [0,1,2,3,4] }"}',
'{"type":"json","flattenSpec":{"useFieldDiscovery":true,"fields":[{"type":"path","name":"len","expr":"$.values.length()"},{"type":"path","name":"min","expr":"$.values.min()"},{"type":"path","name":"max","expr":"$.values.max()"},{"type":"path","name":"sum","expr":"$.values.sum()"}]}}',
'[{"name":"timestamp","type":"string"},{"name":"len","type":"long"},{"name":"min","type":"long"},{"name":"max","type":"long"},{"name":"sum","type":"long"}]'
)
))
SELECT
TIME_PARSE("timestamp") AS __time,
"len",
"min",
"max",
"sum"
FROM "source"
GROUP BY 1, 2, 3, 4, 5
PARTITIONED BY HOUR

View File

@ -0,0 +1,93 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2015-09-12T00:00:00.000Z",
"result" : {
"minTime" : "2015-09-12T00:00:00.000Z",
"maxTime" : "2015-09-12T00:00:00.000Z"
}
}
]
},
{
"description": "scan, all",
"query": {
"queryType": "scan",
"dataSource": "%%DATASOURCE%%",
"intervals": [
"2013-01-01/2020-01-02"
],
"resultFormat":"compactedList"
},
"expectedResults": [{
"segmentId":"dstsparse_column_msq.json_2015-09-12T00:00:00.000Z_2015-09-12T01:00:00.000Z_2022-11-17T12:32:11.247Z",
"columns":["__time","dimB","dimA","dimC","dimD","dimE","dimF","count","sum_metA"],
"events":[
[1442016000000,"F","C",null,null,null,null,1,1],
[1442016000000,"J","C",null,null,null,null,1,1],
[1442016000000,"R","J",null,null,null,null,1,1],
[1442016000000,"S","Z",null,null,null,null,1,1],
[1442016000000,"T","H",null,null,null,null,1,1],
[1442016000000,"X",null,"A",null,null,null,1,1],
[1442016000000,"X","H",null,null,null,null,3,3],
[1442016000000,"Z","H",null,null,null,null,1,1]
]
}],
"fieldsToTest": ["events"]
},
{
"description": "roll up ratio",
"query": {
"queryType":"timeseries",
"dataSource":{
"type":"table",
"name":"%%DATASOURCE%%"
},
"intervals":{
"type":"intervals",
"intervals":[
"2013-01-01/2020-01-02"
]
},
"granularity":{
"type":"all"
},
"aggregations":[
{
"type":"count",
"name":"a0"
},
{
"type":"longSum",
"name":"a1",
"fieldName":"count",
"expression":null
}
],
"postAggregations":[
{
"type":"expression",
"name":"p0",
"expression":"((\"a0\" * 1.00) / \"a1\")",
"ordering":null
}
]
},
"expectedResults": [
{
"timestamp" : "2015-09-12T00:00:00.000Z",
"result" : {
"a1" : 10,
"p0" : 0.8,
"a0" : 8
}
}
]
}
]

View File

@ -0,0 +1,21 @@
REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
WITH "source" AS (SELECT * FROM TABLE(
EXTERN(
'{"type":"inline","data":"{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"F\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"J\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"Z\",\"dimB\":\"S\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"Z\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"J\",\"dimB\":\"R\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"T\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimC\":\"A\",\"dimB\":\"X\",\"metA\":1}\n"}',
'{"type":"json"}',
'[{"name":"time","type":"string"},{"name":"dimB","type":"string"},{"name":"dimA","type":"string"},{"name":"dimC","type":"string"},{"name":"dimD","type":"string"},{"name":"dimE","type":"string"},{"name":"dimF","type":"string"},{"name":"metA","type":"long"}]'
)
))
SELECT
TIME_FLOOR(TIME_PARSE("time"), 'PT1H') AS __time,
"dimB",
"dimA",
"dimC",
"dimD",
"dimE",
"dimF",
COUNT(*) AS "count",
SUM("metA") AS "sum_metA"
FROM "source"
GROUP BY 1, 2, 3, 4, 5, 6, 7
PARTITIONED BY HOUR

View File

@ -0,0 +1,29 @@
REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
WITH "source" AS (SELECT * FROM TABLE(
EXTERN(
'{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz","https://druid.apache.org/data/wikipedia.json.gz"]}',
'{"type":"json"}',
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
)
))
SELECT
TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time,
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
COUNT(*) AS "count",
SUM("added") AS "added",
SUM("deleted") AS "deleted",
SUM("delta") AS "delta"
FROM "source"
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
PARTITIONED BY DAY

View File

@ -0,0 +1,47 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2016-06-27T00:00:11.000Z",
"result" : {
"minTime" : "2016-06-27T00:00:11.000Z",
"maxTime" : "2016-06-27T21:31:02.000Z"
}
}
]
},
{
"description": "simple aggr",
"query":{
"queryType" : "topN",
"dataSource" : "%%DATASOURCE%%",
"intervals" : ["2016-06-27/2016-06-28"],
"granularity" : "all",
"dimension" : "page",
"metric" : "count",
"threshold" : 3,
"aggregations" : [
{
"type" : "count",
"name" : "count"
}
]
},
"expectedResults":[
{
"timestamp" : "2016-06-27T00:00:11.000Z",
"result" :
[
{"count":29,"page":"Copa América Centenario"},
{"count":16,"page":"User:Cyde/List of candidates for speedy deletion/Subpage"},
{"count":16,"page":"Wikipedia:Administrators' noticeboard/Incidents"}
]
}
]
}
]

View File

@ -0,0 +1,32 @@
REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
WITH "source" as (SELECT * FROM TABLE(
EXTERN(
'{"type":"local","files":["/resources/data/batch_index/json/wikipedia_index_data1.json","/resources/data/batch_index/json/wikipedia_index_data2.json","/resources/data/batch_index/json/wikipedia_index_data3.json"]}',
'{"type":"json"}',
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
)
))
SELECT
TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time,
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
COUNT(*) AS "count",
SUM("added") AS "added",
SUM("deleted") AS "deleted",
SUM("delta") AS "delta",
APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
FROM "source"
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
PARTITIONED BY DAY

View File

@ -0,0 +1,150 @@
[
{
"description": "timeseries, 1 agg, all",
"query":{
"queryType" : "timeBoundary",
"dataSource": "%%DATASOURCE%%"
},
"expectedResults":[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-09-01T12:41:27.000Z"
}
}
]
},
{
"description": "timeseries, datasketch aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "HLLSketchMerge",
"name": "approxCountHLL",
"fieldName": "HLLSketchBuild",
"lgK": 12,
"tgtHllType": "HLL_4",
"round": true
},
{
"type":"thetaSketch",
"name":"approxCountTheta",
"fieldName":"thetaSketch",
"size":16384,
"shouldFinalize":true,
"isInputThetaSketch":false,
"errorBoundsStdDev":null
},
{
"type":"quantilesDoublesSketch",
"name":"quantilesSketch",
"fieldName":"quantilesDoublesSketch",
"k":128
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"quantilesSketch":5,
"approxCountTheta":5.0,
"approxCountHLL":5
}
}
]
},
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"page"
],
"filter":{
"type":"selector",
"dimension":"language",
"value":"zh"
},
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"added",
"name":"added_count"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 9050.0,
"page" : "Crimson Typhoon",
"added_count" : 905,
"rows" : 1
}
} ]
},
{
"description": "timeseries, stringFirst/stringLast aggs, all",
"query":{
"queryType" : "timeseries",
"dataSource": "%%DATASOURCE%%",
"granularity":"day",
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
],
"filter":null,
"aggregations":[
{
"type": "stringFirst",
"name": "first_user",
"fieldName": "user"
},
{
"type":"stringLast",
"name":"last_user",
"fieldName":"user"
}
]
},
"expectedResults":[
{
"timestamp" : "2013-08-31T00:00:00.000Z",
"result" : {
"first_user":"nuclear",
"last_user":"stringer"
}
}
]
}
]

View File

@ -0,0 +1,62 @@
[
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"%%DATASOURCE%%",
"granularity":"day",
"dimensions":[
"page",
"city"
],
"filter":{
"type":"selector",
"dimension":"language",
"value":"language-zh"
},
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"triple-added",
"name":"added_count"
},
{
"type":"longSum",
"fieldName":"delta",
"name":"delta_sum"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 27150.0,
"page" : "Crimson Typhoon",
"city" : "Taiyuan",
"added_count" : 2715,
"delta_sum" : 900,
"rows" : 1
}
} ]
}
]

View File

@ -0,0 +1,32 @@
REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
WITH "source" AS (SELECT * FROM TABLE(
EXTERN(
'{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}',
'{"type":"json"}',
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"triple-added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
)
))
SELECT
TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time,
"page",
concat('language-', "language") AS "language",
"user",
"unpatrolled",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
COUNT(*) AS "count",
SUM("added") AS "added",
SUM("added")*3 AS "triple-added",
SUM("deleted") AS "deleted",
SUM("delta") AS "delta",
APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
FROM "source"
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
PARTITIONED BY DAY

View File

@ -0,0 +1,33 @@
REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
WITH "source" AS (SELECT * FROM TABLE(
EXTERN(
'{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}',
'{"type":"json"}',
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
)
))
SELECT
TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time,
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
"timestamp",
COUNT(*) AS "count",
SUM("added") AS "added",
SUM("deleted") AS "deleted",
SUM("delta") AS "delta",
APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
FROM "source"
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
PARTITIONED BY DAY

View File

@ -43,6 +43,7 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
import java.util.ArrayList;
import java.util.Collections;
@ -95,7 +96,15 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
*/
public SqlTaskStatus submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException
{
return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null));
return submitMsqTask(sqlQueryString, ImmutableMap.of());
}
/**
* Submits a task to the MSQ API with the given query string, and default headers and custom context parameters
*/
public SqlTaskStatus submitMsqTask(String sqlQueryString, Map<String, Object> context) throws ExecutionException, InterruptedException
{
return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, context, null));
}
// Run the task, wait for it to complete, fetch the reports, verify the results,
@ -154,6 +163,7 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
throw new TaskStillRunningException();
},
(Throwable t) -> t instanceof TaskStillRunningException,
99,
100
);
}
@ -250,6 +260,25 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
}
}
/**
* Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the created task to be completed.
*/
public void submitMsqTaskAndWaitForCompletion(String sqlQueryString, Map<String, Object> context)
throws Exception
{
SqlTaskStatus sqlTaskStatus = submitMsqTask(sqlQueryString, context);
LOG.info("Sql Task submitted with task Id - %s", sqlTaskStatus.getTaskId());
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
}
private static class TaskStillRunningException extends Exception
{