mirror of https://github.com/apache/druid.git
More unit tests for JsonParserIterator; Integration tests for query errors (#11091)
* unit tests for timeout exception in init * integration tests * run integraion test on travis * fix inspection
This commit is contained in:
parent
8432d82c48
commit
a6a2758095
20
.travis.yml
20
.travis.yml
|
@ -472,6 +472,15 @@ jobs:
|
||||||
script: *run_integration_test
|
script: *run_integration_test
|
||||||
after_failure: *integration_test_diags
|
after_failure: *integration_test_diags
|
||||||
|
|
||||||
|
- &integration_query_error
|
||||||
|
name: "(Compile=openjdk8, Run=openjdk8) query error integration test"
|
||||||
|
stage: Tests - phase 2
|
||||||
|
jdk: openjdk8
|
||||||
|
services: *integration_test_services
|
||||||
|
env: TESTNG_GROUPS='-Dgroups=query-error' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
|
||||||
|
script: *run_integration_test
|
||||||
|
after_failure: *integration_test_diags
|
||||||
|
|
||||||
- &integration_security
|
- &integration_security
|
||||||
name: "(Compile=openjdk8, Run=openjdk8) security integration test"
|
name: "(Compile=openjdk8, Run=openjdk8) security integration test"
|
||||||
stage: Tests - phase 2
|
stage: Tests - phase 2
|
||||||
|
@ -530,13 +539,13 @@ jobs:
|
||||||
stage: Tests - phase 2
|
stage: Tests - phase 2
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
services: *integration_test_services
|
services: *integration_test_services
|
||||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
|
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
|
||||||
script: *run_integration_test
|
script: *run_integration_test
|
||||||
after_failure: *integration_test_diags
|
after_failure: *integration_test_diags
|
||||||
|
|
||||||
- <<: *integration_tests
|
- <<: *integration_tests
|
||||||
name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
|
name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
|
||||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
|
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
|
||||||
|
|
||||||
- <<: *integration_tests
|
- <<: *integration_tests
|
||||||
name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
|
name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
|
||||||
|
@ -586,6 +595,11 @@ jobs:
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
|
env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
|
||||||
|
|
||||||
|
- <<: *integration_query_error
|
||||||
|
name: "(Compile=openjdk8, Run=openjdk11) query error integration test for missing segments"
|
||||||
|
jdk: openjdk8
|
||||||
|
env: TESTNG_GROUPS='-Dgroups=query-error' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
|
||||||
|
|
||||||
- <<: *integration_security
|
- <<: *integration_security
|
||||||
name: "(Compile=openjdk8, Run=openjdk11) security integration test"
|
name: "(Compile=openjdk8, Run=openjdk11) security integration test"
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
|
@ -614,7 +628,7 @@ jobs:
|
||||||
- <<: *integration_tests
|
- <<: *integration_tests
|
||||||
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
|
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
|
||||||
jdk: openjdk8
|
jdk: openjdk8
|
||||||
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
|
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
|
||||||
|
|
||||||
- <<: *integration_tests
|
- <<: *integration_tests
|
||||||
name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests"
|
name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests"
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
version: "2.2"
|
||||||
|
services:
|
||||||
|
druid-zookeeper-kafka:
|
||||||
|
extends:
|
||||||
|
file: docker-compose.base.yml
|
||||||
|
service: druid-zookeeper-kafka
|
||||||
|
|
||||||
|
druid-metadata-storage:
|
||||||
|
extends:
|
||||||
|
file: docker-compose.base.yml
|
||||||
|
service: druid-metadata-storage
|
||||||
|
environment:
|
||||||
|
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||||
|
depends_on:
|
||||||
|
- druid-zookeeper-kafka
|
||||||
|
|
||||||
|
druid-overlord:
|
||||||
|
extends:
|
||||||
|
file: docker-compose.base.yml
|
||||||
|
service: druid-overlord
|
||||||
|
environment:
|
||||||
|
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||||
|
depends_on:
|
||||||
|
- druid-metadata-storage
|
||||||
|
- druid-zookeeper-kafka
|
||||||
|
|
||||||
|
druid-coordinator:
|
||||||
|
extends:
|
||||||
|
file: docker-compose.base.yml
|
||||||
|
service: druid-coordinator
|
||||||
|
environment:
|
||||||
|
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||||
|
depends_on:
|
||||||
|
- druid-overlord
|
||||||
|
- druid-metadata-storage
|
||||||
|
- druid-zookeeper-kafka
|
||||||
|
|
||||||
|
druid-broker:
|
||||||
|
extends:
|
||||||
|
file: docker-compose.base.yml
|
||||||
|
service: druid-broker
|
||||||
|
environment:
|
||||||
|
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||||
|
depends_on:
|
||||||
|
- druid-zookeeper-kafka
|
||||||
|
- druid-historical-for-query-error-test
|
||||||
|
|
||||||
|
druid-router:
|
||||||
|
extends:
|
||||||
|
file: docker-compose.base.yml
|
||||||
|
service: druid-router
|
||||||
|
environment:
|
||||||
|
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||||
|
depends_on:
|
||||||
|
- druid-zookeeper-kafka
|
||||||
|
- druid-coordinator
|
||||||
|
- druid-broker
|
||||||
|
|
||||||
|
druid-historical-for-query-error-test:
|
||||||
|
image: druid/cluster
|
||||||
|
container_name: druid-historical-for-query-error-test
|
||||||
|
networks:
|
||||||
|
druid-it-net:
|
||||||
|
ipv4_address: 172.172.172.14
|
||||||
|
ports:
|
||||||
|
- 8084:8083
|
||||||
|
- 8284:8283
|
||||||
|
- 5010:5007
|
||||||
|
privileged: true
|
||||||
|
volumes:
|
||||||
|
- ${HOME}/shared:/shared
|
||||||
|
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
|
||||||
|
env_file:
|
||||||
|
- ./environment-configs/common
|
||||||
|
- ./environment-configs/historical-for-query-error-test
|
||||||
|
environment:
|
||||||
|
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||||
|
depends_on:
|
||||||
|
- druid-zookeeper-kafka
|
||||||
|
|
||||||
|
networks:
|
||||||
|
druid-it-net:
|
||||||
|
name: druid-it-net
|
||||||
|
ipam:
|
||||||
|
config:
|
||||||
|
- subnet: 172.172.172.0/24
|
|
@ -96,7 +96,7 @@ services:
|
||||||
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
|
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
|
||||||
env_file:
|
env_file:
|
||||||
- ./environment-configs/common
|
- ./environment-configs/common
|
||||||
- ./environment-configs/historical-for-query-retry-test
|
- ./environment-configs/historical-for-query-error-test
|
||||||
environment:
|
environment:
|
||||||
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||||
depends_on:
|
depends_on:
|
||||||
|
|
|
@ -23,7 +23,7 @@ getConfPath()
|
||||||
case "$1" in
|
case "$1" in
|
||||||
_common) echo $cluster_conf_base/_common ;;
|
_common) echo $cluster_conf_base/_common ;;
|
||||||
historical) echo $cluster_conf_base/data/historical ;;
|
historical) echo $cluster_conf_base/data/historical ;;
|
||||||
historical-for-query-retry-test) echo $cluster_conf_base/data/historical ;;
|
historical-for-query-error-test) echo $cluster_conf_base/data/historical ;;
|
||||||
middleManager) echo $cluster_conf_base/data/middleManager ;;
|
middleManager) echo $cluster_conf_base/data/middleManager ;;
|
||||||
indexer) echo $cluster_conf_base/data/indexer ;;
|
indexer) echo $cluster_conf_base/data/indexer ;;
|
||||||
coordinator) echo $cluster_conf_base/master/coordinator ;;
|
coordinator) echo $cluster_conf_base/master/coordinator ;;
|
||||||
|
@ -85,14 +85,14 @@ setupData()
|
||||||
# The "query" and "security" test groups require data to be setup before running the tests.
|
# The "query" and "security" test groups require data to be setup before running the tests.
|
||||||
# In particular, they requires segments to be download from a pre-existing s3 bucket.
|
# In particular, they requires segments to be download from a pre-existing s3 bucket.
|
||||||
# This is done by using the loadSpec put into metadatastore and s3 credientials set below.
|
# This is done by using the loadSpec put into metadatastore and s3 credientials set below.
|
||||||
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ]; then
|
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ]; then
|
||||||
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
|
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
|
||||||
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
|
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
|
||||||
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop
|
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop
|
||||||
# below s3 credentials needed to access the pre-existing s3 bucket
|
# below s3 credentials needed to access the pre-existing s3 bucket
|
||||||
setKey $DRUID_SERVICE druid.s3.accessKey AKIAT2GGLKKJQCMG64V4
|
setKey $DRUID_SERVICE druid.s3.accessKey AKIAT2GGLKKJQCMG64V4
|
||||||
setKey $DRUID_SERVICE druid.s3.secretKey HwcqHFaxC7bXMO7K6NdCwAdvq0tcPtHJP3snZ2tR
|
setKey $DRUID_SERVICE druid.s3.secretKey HwcqHFaxC7bXMO7K6NdCwAdvq0tcPtHJP3snZ2tR
|
||||||
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]; then
|
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]; then
|
||||||
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\",\"druid-integration-tests\"]
|
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\",\"druid-integration-tests\"]
|
||||||
else
|
else
|
||||||
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
|
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
|
||||||
|
|
|
@ -17,8 +17,8 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
DRUID_SERVICE=historical-for-query-retry-test
|
DRUID_SERVICE=historical-for-query-error-test
|
||||||
DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log
|
DRUID_LOG_PATH=/shared/logs/historical-for-query-error-test.log
|
||||||
|
|
||||||
# JAVA OPTS
|
# JAVA OPTS
|
||||||
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010
|
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010
|
||||||
|
@ -27,6 +27,6 @@ SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=tr
|
||||||
druid_processing_buffer_sizeBytes=25000000
|
druid_processing_buffer_sizeBytes=25000000
|
||||||
druid_processing_numThreads=2
|
druid_processing_numThreads=2
|
||||||
druid_query_groupBy_maxOnDiskStorage=300000000
|
druid_query_groupBy_maxOnDiskStorage=300000000
|
||||||
druid_segmentCache_locations=[{"path":"/shared/druid/indexCache-query-retry-test","maxSize":5000000000}]
|
druid_segmentCache_locations=[{"path":"/shared/druid/indexCache-query-error-test","maxSize":5000000000}]
|
||||||
druid_auth_basic_common_cacheDirectory=/tmp/authCache/historical-query-retry-test
|
druid_auth_basic_common_cacheDirectory=/tmp/authCache/historical-query-error-test
|
||||||
druid_server_https_crlPath=/tls/revocations.crl
|
druid_server_https_crlPath=/tls/revocations.crl
|
|
@ -0,0 +1,20 @@
|
||||||
|
-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
-- contributor license agreements. See the NOTICE file distributed with
|
||||||
|
-- this work for additional information regarding copyright ownership.
|
||||||
|
-- The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
-- (the "License"); you may not use this file except in compliance with
|
||||||
|
-- the License. You may obtain a copy of the License at
|
||||||
|
--
|
||||||
|
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
--
|
||||||
|
-- Unless required by applicable law or agreed to in writing, software
|
||||||
|
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
-- See the License for the specific language governing permissions and
|
||||||
|
-- limitations under the License.
|
||||||
|
|
||||||
|
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9','twitterstream','2013-05-13T01:08:18.192Z','2013-01-01T00:00:00.000Z','2013-01-02T00:00:00.000Z',0,'2013-01-02T04:13:41.980Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z\",\"version\":\"2013-01-02T04:13:41.980Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/2013-01-02T04:13:41.980Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":445235220,\"identifier\":\"twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9\"}');
|
||||||
|
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9','twitterstream','2013-05-13T00:03:28.640Z','2013-01-02T00:00:00.000Z','2013-01-03T00:00:00.000Z',0,'2013-01-03T03:44:58.791Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z\",\"version\":\"2013-01-03T03:44:58.791Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z/2013-01-03T03:44:58.791Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":435325540,\"identifier\":\"twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9\"}');
|
||||||
|
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9','twitterstream','2013-05-13T00:03:48.807Z','2013-01-03T00:00:00.000Z','2013-01-04T00:00:00.000Z',0,'2013-01-04T04:09:13.590Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z\",\"version\":\"2013-01-04T04:09:13.590Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z/2013-01-04T04:09:13.590Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":411651320,\"identifier\":\"twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9\"}');
|
||||||
|
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','wikipedia_editstream','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"wikipedia_editstream\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia_editstream/2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z/2013-01-10T08:13:47.830Z_v9/0/index.zip\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"}');
|
||||||
|
INSERT INTO druid_segments (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES ('wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z', 'wikipedia', '2013-08-08T21:26:23.799Z', '2013-08-01T00:00:00.000Z', '2013-08-02T00:00:00.000Z', '0', '2013-08-08T21:22:48.989Z', '1', '{\"dataSource\":\"wikipedia\",\"interval\":\"2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z\",\"version\":\"2013-08-08T21:22:48.989Z\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia/20130801T000000.000Z_20130802T000000.000Z/2013-08-08T21_22_48.989Z/0/index.zip\"},\"dimensions\":\"dma_code,continent_code,geo,area_code,robot,country_name,network,city,namespace,anonymous,unpatrolled,page,postal_code,language,newpage,user,region_lookup\",\"metrics\":\"count,delta,variation,added,deleted\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":24664730,\"identifier\":\"wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z\"}');
|
|
@ -31,7 +31,7 @@ getComposeArgs()
|
||||||
if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ]
|
if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ]
|
||||||
then
|
then
|
||||||
# Sanity check: cannot combine CliIndexer tests with security, query-retry tests
|
# Sanity check: cannot combine CliIndexer tests with security, query-retry tests
|
||||||
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]
|
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]
|
||||||
then
|
then
|
||||||
echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer"
|
echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer"
|
||||||
exit 1
|
exit 1
|
||||||
|
@ -56,6 +56,11 @@ getComposeArgs()
|
||||||
# default + additional historical modified for query retry test
|
# default + additional historical modified for query retry test
|
||||||
# See CliHistoricalForQueryRetryTest.
|
# See CliHistoricalForQueryRetryTest.
|
||||||
echo "-f ${DOCKERDIR}/docker-compose.query-retry-test.yml"
|
echo "-f ${DOCKERDIR}/docker-compose.query-retry-test.yml"
|
||||||
|
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ]
|
||||||
|
then
|
||||||
|
# default + additional historical modified for query error test
|
||||||
|
# See CliHistoricalForQueryRetryTest.
|
||||||
|
echo "-f ${DOCKERDIR}/docker-compose.query-error-test.yml"
|
||||||
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]
|
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]
|
||||||
then
|
then
|
||||||
# the 'high availability' test cluster with multiple coordinators and overlords
|
# the 'high availability' test cluster with multiple coordinators and overlords
|
||||||
|
|
|
@ -25,19 +25,19 @@ import io.airlift.airline.Command;
|
||||||
import org.apache.druid.guice.LazySingleton;
|
import org.apache.druid.guice.LazySingleton;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.query.QuerySegmentWalker;
|
import org.apache.druid.query.QuerySegmentWalker;
|
||||||
import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest;
|
import org.apache.druid.server.coordination.ServerManagerForQueryErrorTest;
|
||||||
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
@Command(
|
@Command(
|
||||||
name = "historical-for-query-retry-test",
|
name = "historical-for-query-error-test",
|
||||||
description = "Runs a Historical node modified for query retry test"
|
description = "Runs a Historical node modified for query error test"
|
||||||
)
|
)
|
||||||
public class CliHistoricalForQueryRetryTest extends CliHistorical
|
public class CliHistoricalForQueryErrorTest extends CliHistorical
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(CliHistoricalForQueryRetryTest.class);
|
private static final Logger log = new Logger(CliHistoricalForQueryErrorTest.class);
|
||||||
|
|
||||||
public CliHistoricalForQueryRetryTest()
|
public CliHistoricalForQueryErrorTest()
|
||||||
{
|
{
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
@ -46,12 +46,12 @@ public class CliHistoricalForQueryRetryTest extends CliHistorical
|
||||||
@Override
|
@Override
|
||||||
public void configure(Properties properties)
|
public void configure(Properties properties)
|
||||||
{
|
{
|
||||||
log.info("Historical is configured for testing query retry on missing segments");
|
log.info("Historical is configured for testing query error on missing segments");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void bindQuerySegmentWalker(Binder binder)
|
public void bindQuerySegmentWalker(Binder binder)
|
||||||
{
|
{
|
||||||
binder.bind(QuerySegmentWalker.class).to(ServerManagerForQueryRetryTest.class).in(LazySingleton.class);
|
binder.bind(QuerySegmentWalker.class).to(ServerManagerForQueryErrorTest.class).in(LazySingleton.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -26,6 +26,6 @@ public class QueryRetryTestCommandCreator implements CliCommandCreator
|
||||||
@Override
|
@Override
|
||||||
public void addCommands(CliBuilder builder)
|
public void addCommands(CliBuilder builder)
|
||||||
{
|
{
|
||||||
builder.withGroup("server").withCommands(CliHistoricalForQueryRetryTest.class);
|
builder.withGroup("server").withCommands(CliHistoricalForQueryErrorTest.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,238 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.coordination;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.commons.lang3.mutable.MutableBoolean;
|
||||||
|
import org.apache.druid.client.cache.Cache;
|
||||||
|
import org.apache.druid.client.cache.CacheConfig;
|
||||||
|
import org.apache.druid.client.cache.CachePopulator;
|
||||||
|
import org.apache.druid.guice.annotations.Processing;
|
||||||
|
import org.apache.druid.guice.annotations.Smile;
|
||||||
|
import org.apache.druid.java.util.common.guava.Accumulator;
|
||||||
|
import org.apache.druid.java.util.common.guava.Sequence;
|
||||||
|
import org.apache.druid.java.util.common.guava.Yielder;
|
||||||
|
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
|
||||||
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
|
import org.apache.druid.query.Query;
|
||||||
|
import org.apache.druid.query.QueryCapacityExceededException;
|
||||||
|
import org.apache.druid.query.QueryRunner;
|
||||||
|
import org.apache.druid.query.QueryRunnerFactory;
|
||||||
|
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
|
import org.apache.druid.query.QueryTimeoutException;
|
||||||
|
import org.apache.druid.query.QueryToolChest;
|
||||||
|
import org.apache.druid.query.QueryUnsupportedException;
|
||||||
|
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
|
||||||
|
import org.apache.druid.query.ResourceLimitExceededException;
|
||||||
|
import org.apache.druid.query.SegmentDescriptor;
|
||||||
|
import org.apache.druid.segment.ReferenceCountingSegment;
|
||||||
|
import org.apache.druid.segment.SegmentReference;
|
||||||
|
import org.apache.druid.segment.join.JoinableFactory;
|
||||||
|
import org.apache.druid.server.SegmentManager;
|
||||||
|
import org.apache.druid.server.initialization.ServerConfig;
|
||||||
|
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This server manager is designed to test various query failures.
|
||||||
|
*
|
||||||
|
* - Missing segments. A segment can be missing during a query if a historical drops the segment
|
||||||
|
* after the broker issues the query to the historical. To mimic this situation, the historical
|
||||||
|
* with this server manager announces all segments assigned, but reports missing segments for the
|
||||||
|
* first 3 segments specified in the query. See ITQueryRetryTestOnMissingSegments.
|
||||||
|
* - Other query errors. This server manager returns a sequence that always throws an exception
|
||||||
|
* based on a given query context value. See ITQueryErrorTest.
|
||||||
|
*
|
||||||
|
* @see org.apache.druid.query.RetryQueryRunner for query retrying.
|
||||||
|
* @see org.apache.druid.client.JsonParserIterator for handling query errors from historicals.
|
||||||
|
*/
|
||||||
|
public class ServerManagerForQueryErrorTest extends ServerManager
|
||||||
|
{
|
||||||
|
// Query context key that indicates this query is for query retry testing.
|
||||||
|
public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test";
|
||||||
|
public static final String QUERY_TIMEOUT_TEST_CONTEXT_KEY = "query-timeout-test";
|
||||||
|
public static final String QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY = "query-capacity-exceeded-test";
|
||||||
|
public static final String QUERY_UNSUPPORTED_TEST_CONTEXT_KEY = "query-unsupported-test";
|
||||||
|
public static final String RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY = "resource-limit-exceeded-test";
|
||||||
|
public static final String QUERY_FAILURE_TEST_CONTEXT_KEY = "query-failure-test";
|
||||||
|
|
||||||
|
private static final Logger LOG = new Logger(ServerManagerForQueryErrorTest.class);
|
||||||
|
private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 3;
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<String, Set<SegmentDescriptor>> queryToIgnoredSegments = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public ServerManagerForQueryErrorTest(
|
||||||
|
QueryRunnerFactoryConglomerate conglomerate,
|
||||||
|
ServiceEmitter emitter,
|
||||||
|
@Processing ExecutorService exec,
|
||||||
|
CachePopulator cachePopulator,
|
||||||
|
@Smile ObjectMapper objectMapper,
|
||||||
|
Cache cache,
|
||||||
|
CacheConfig cacheConfig,
|
||||||
|
SegmentManager segmentManager,
|
||||||
|
JoinableFactory joinableFactory,
|
||||||
|
ServerConfig serverConfig
|
||||||
|
)
|
||||||
|
{
|
||||||
|
super(
|
||||||
|
conglomerate,
|
||||||
|
emitter,
|
||||||
|
exec,
|
||||||
|
cachePopulator,
|
||||||
|
objectMapper,
|
||||||
|
cache,
|
||||||
|
cacheConfig,
|
||||||
|
segmentManager,
|
||||||
|
joinableFactory,
|
||||||
|
serverConfig
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
<T> QueryRunner<T> buildQueryRunnerForSegment(
|
||||||
|
Query<T> query,
|
||||||
|
SegmentDescriptor descriptor,
|
||||||
|
QueryRunnerFactory<T, Query<T>> factory,
|
||||||
|
QueryToolChest<T, Query<T>> toolChest,
|
||||||
|
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
|
||||||
|
Function<SegmentReference, SegmentReference> segmentMapFn,
|
||||||
|
AtomicLong cpuTimeAccumulator,
|
||||||
|
Optional<byte[]> cacheKeyPrefix
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (query.getContextBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
|
||||||
|
final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
|
||||||
|
queryToIgnoredSegments.compute(
|
||||||
|
query.getMostSpecificId(),
|
||||||
|
(queryId, ignoredSegments) -> {
|
||||||
|
if (ignoredSegments == null) {
|
||||||
|
ignoredSegments = new HashSet<>();
|
||||||
|
}
|
||||||
|
if (ignoredSegments.size() < MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
|
||||||
|
ignoredSegments.add(descriptor);
|
||||||
|
isIgnoreSegment.setTrue();
|
||||||
|
}
|
||||||
|
return ignoredSegments;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (isIgnoreSegment.isTrue()) {
|
||||||
|
LOG.info("Pretending I don't have segment[%s]", descriptor);
|
||||||
|
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
|
||||||
|
}
|
||||||
|
} else if (query.getContextBoolean(QUERY_TIMEOUT_TEST_CONTEXT_KEY, false)) {
|
||||||
|
return (queryPlus, responseContext) -> new Sequence<T>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw new QueryTimeoutException("query timeout test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw new QueryTimeoutException("query timeout test");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else if (query.getContextBoolean(QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY, false)) {
|
||||||
|
return (queryPlus, responseContext) -> new Sequence<T>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw QueryCapacityExceededException.withErrorMessageAndResolvedHost("query capacity exceeded test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw QueryCapacityExceededException.withErrorMessageAndResolvedHost("query capacity exceeded test");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else if (query.getContextBoolean(QUERY_UNSUPPORTED_TEST_CONTEXT_KEY, false)) {
|
||||||
|
return (queryPlus, responseContext) -> new Sequence<T>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw new QueryUnsupportedException("query unsupported test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw new QueryUnsupportedException("query unsupported test");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else if (query.getContextBoolean(RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY, false)) {
|
||||||
|
return (queryPlus, responseContext) -> new Sequence<T>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw new ResourceLimitExceededException("resource limit exceeded test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw new ResourceLimitExceededException("resource limit exceeded test");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else if (query.getContextBoolean(QUERY_FAILURE_TEST_CONTEXT_KEY, false)) {
|
||||||
|
return (queryPlus, responseContext) -> new Sequence<T>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw new RuntimeException("query failure test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
||||||
|
{
|
||||||
|
throw new RuntimeException("query failure test");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return super.buildQueryRunnerForSegment(
|
||||||
|
query,
|
||||||
|
descriptor,
|
||||||
|
factory,
|
||||||
|
toolChest,
|
||||||
|
timeline,
|
||||||
|
segmentMapFn,
|
||||||
|
cpuTimeAccumulator,
|
||||||
|
cacheKeyPrefix
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,144 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.druid.server.coordination;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.google.inject.Inject;
|
|
||||||
import org.apache.commons.lang3.mutable.MutableBoolean;
|
|
||||||
import org.apache.druid.client.cache.Cache;
|
|
||||||
import org.apache.druid.client.cache.CacheConfig;
|
|
||||||
import org.apache.druid.client.cache.CachePopulator;
|
|
||||||
import org.apache.druid.guice.annotations.Processing;
|
|
||||||
import org.apache.druid.guice.annotations.Smile;
|
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
|
||||||
import org.apache.druid.query.Query;
|
|
||||||
import org.apache.druid.query.QueryRunner;
|
|
||||||
import org.apache.druid.query.QueryRunnerFactory;
|
|
||||||
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
|
|
||||||
import org.apache.druid.query.QueryToolChest;
|
|
||||||
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
|
|
||||||
import org.apache.druid.query.SegmentDescriptor;
|
|
||||||
import org.apache.druid.segment.ReferenceCountingSegment;
|
|
||||||
import org.apache.druid.segment.SegmentReference;
|
|
||||||
import org.apache.druid.segment.join.JoinableFactory;
|
|
||||||
import org.apache.druid.server.SegmentManager;
|
|
||||||
import org.apache.druid.server.initialization.ServerConfig;
|
|
||||||
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.function.Function;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This server manager is designed to test query retry on missing segments. A segment can be missing during a query
|
|
||||||
* if a historical drops the segment after the broker issues the query to the historical. To mimic this situation,
|
|
||||||
* the historical with this server manager announces all segments assigned, but reports missing segments for the
|
|
||||||
* first 3 segments specified in the query.
|
|
||||||
*
|
|
||||||
* @see org.apache.druid.query.RetryQueryRunner
|
|
||||||
*/
|
|
||||||
public class ServerManagerForQueryRetryTest extends ServerManager
|
|
||||||
{
|
|
||||||
// Query context key that indicates this query is for query retry testing.
|
|
||||||
public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test";
|
|
||||||
|
|
||||||
private static final Logger LOG = new Logger(ServerManagerForQueryRetryTest.class);
|
|
||||||
private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 3;
|
|
||||||
|
|
||||||
private final ConcurrentHashMap<String, Set<SegmentDescriptor>> queryToIgnoredSegments = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public ServerManagerForQueryRetryTest(
|
|
||||||
QueryRunnerFactoryConglomerate conglomerate,
|
|
||||||
ServiceEmitter emitter,
|
|
||||||
@Processing ExecutorService exec,
|
|
||||||
CachePopulator cachePopulator,
|
|
||||||
@Smile ObjectMapper objectMapper,
|
|
||||||
Cache cache,
|
|
||||||
CacheConfig cacheConfig,
|
|
||||||
SegmentManager segmentManager,
|
|
||||||
JoinableFactory joinableFactory,
|
|
||||||
ServerConfig serverConfig
|
|
||||||
)
|
|
||||||
{
|
|
||||||
super(
|
|
||||||
conglomerate,
|
|
||||||
emitter,
|
|
||||||
exec,
|
|
||||||
cachePopulator,
|
|
||||||
objectMapper,
|
|
||||||
cache,
|
|
||||||
cacheConfig,
|
|
||||||
segmentManager,
|
|
||||||
joinableFactory,
|
|
||||||
serverConfig
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
<T> QueryRunner<T> buildQueryRunnerForSegment(
|
|
||||||
Query<T> query,
|
|
||||||
SegmentDescriptor descriptor,
|
|
||||||
QueryRunnerFactory<T, Query<T>> factory,
|
|
||||||
QueryToolChest<T, Query<T>> toolChest,
|
|
||||||
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
|
|
||||||
Function<SegmentReference, SegmentReference> segmentMapFn,
|
|
||||||
AtomicLong cpuTimeAccumulator,
|
|
||||||
Optional<byte[]> cacheKeyPrefix
|
|
||||||
)
|
|
||||||
{
|
|
||||||
if (query.getContextBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
|
|
||||||
final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
|
|
||||||
queryToIgnoredSegments.compute(
|
|
||||||
query.getMostSpecificId(),
|
|
||||||
(queryId, ignoredSegments) -> {
|
|
||||||
if (ignoredSegments == null) {
|
|
||||||
ignoredSegments = new HashSet<>();
|
|
||||||
}
|
|
||||||
if (ignoredSegments.size() < MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
|
|
||||||
ignoredSegments.add(descriptor);
|
|
||||||
isIgnoreSegment.setTrue();
|
|
||||||
}
|
|
||||||
return ignoredSegments;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
if (isIgnoreSegment.isTrue()) {
|
|
||||||
LOG.info("Pretending I don't have segment[%s]", descriptor);
|
|
||||||
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return super.buildQueryRunnerForSegment(
|
|
||||||
query,
|
|
||||||
descriptor,
|
|
||||||
factory,
|
|
||||||
toolChest,
|
|
||||||
timeline,
|
|
||||||
segmentMapFn,
|
|
||||||
cpuTimeAccumulator,
|
|
||||||
cacheKeyPrefix
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -54,6 +54,8 @@ public class TestNGGroup
|
||||||
|
|
||||||
public static final String QUERY_RETRY = "query-retry";
|
public static final String QUERY_RETRY = "query-retry";
|
||||||
|
|
||||||
|
public static final String QUERY_ERROR = "query-error";
|
||||||
|
|
||||||
public static final String CLI_INDEXER = "cli-indexer";
|
public static final String CLI_INDEXER = "cli-indexer";
|
||||||
|
|
||||||
public static final String REALTIME_INDEX = "realtime-index";
|
public static final String REALTIME_INDEX = "realtime-index";
|
||||||
|
|
|
@ -0,0 +1,220 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.tests.query;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
import org.apache.druid.query.QueryContexts;
|
||||||
|
import org.apache.druid.server.coordination.ServerManagerForQueryErrorTest;
|
||||||
|
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||||
|
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||||
|
import org.apache.druid.testing.utils.ITRetryUtil;
|
||||||
|
import org.apache.druid.testing.utils.SqlTestQueryHelper;
|
||||||
|
import org.apache.druid.testing.utils.TestQueryHelper;
|
||||||
|
import org.apache.druid.tests.TestNGGroup;
|
||||||
|
import org.apache.druid.tests.indexer.AbstractIndexerTest;
|
||||||
|
import org.testng.annotations.BeforeMethod;
|
||||||
|
import org.testng.annotations.Guice;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class tests various query failures.
|
||||||
|
*
|
||||||
|
* - SQL planning failures. Both {@link org.apache.calcite.sql.parser.SqlParseException}
|
||||||
|
* and {@link org.apache.calcite.tools.ValidationException} are tested using SQLs that must fail.
|
||||||
|
* - Various query errors from historicals. These tests use {@link ServerManagerForQueryErrorTest} to make
|
||||||
|
* the query to always throw an exception. They verify the error code returned by
|
||||||
|
* {@link org.apache.druid.sql.http.SqlResource} and {@link org.apache.druid.server.QueryResource}.
|
||||||
|
*/
|
||||||
|
@Test(groups = TestNGGroup.QUERY_ERROR)
|
||||||
|
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||||
|
public class ITQueryErrorTest
|
||||||
|
{
|
||||||
|
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
|
||||||
|
/**
|
||||||
|
* A simple query used for error tests from historicals. What query is does not matter because the query is always
|
||||||
|
* expected to fail.
|
||||||
|
*
|
||||||
|
* @see ServerManagerForQueryErrorTest#buildQueryRunnerForSegment
|
||||||
|
*/
|
||||||
|
private static final String NATIVE_QUERY_RESOURCE =
|
||||||
|
"/queries/native_query_error_from_historicals_test.json";
|
||||||
|
private static final String SQL_QUERY_RESOURCE =
|
||||||
|
"/queries/sql_error_from_historicals_test.json";
|
||||||
|
/**
|
||||||
|
* A simple sql query template used for plan failure tests.
|
||||||
|
*/
|
||||||
|
private static final String SQL_PLAN_FAILURE_RESOURCE = "/queries/sql_plan_failure_query.json";
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private CoordinatorResourceTestClient coordinatorClient;
|
||||||
|
@Inject
|
||||||
|
private TestQueryHelper queryHelper;
|
||||||
|
@Inject
|
||||||
|
private SqlTestQueryHelper sqlHelper;
|
||||||
|
@Inject
|
||||||
|
private ObjectMapper jsonMapper;
|
||||||
|
|
||||||
|
@BeforeMethod
|
||||||
|
public void before()
|
||||||
|
{
|
||||||
|
// ensure that wikipedia segments are loaded completely
|
||||||
|
ITRetryUtil.retryUntilTrue(
|
||||||
|
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*")
|
||||||
|
public void testSqlParseException() throws Exception
|
||||||
|
{
|
||||||
|
// test a sql without SELECT
|
||||||
|
sqlHelper.testQueriesFromString(buildSqlPlanFailureQuery("FROM t WHERE col = 'a'"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*")
|
||||||
|
public void testSqlValidationException() throws Exception
|
||||||
|
{
|
||||||
|
// test a sql that selects unknown column
|
||||||
|
sqlHelper.testQueriesFromString(
|
||||||
|
buildSqlPlanFailureQuery(StringUtils.format("SELECT unknown_col FROM %s LIMIT 1", WIKIPEDIA_DATA_SOURCE))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*504.*")
|
||||||
|
public void testSqlTimeout() throws Exception
|
||||||
|
{
|
||||||
|
sqlHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*429.*")
|
||||||
|
public void testSqlCapacityExceeded() throws Exception
|
||||||
|
{
|
||||||
|
sqlHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*501.*")
|
||||||
|
public void testSqlUnsupported() throws Exception
|
||||||
|
{
|
||||||
|
sqlHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*")
|
||||||
|
public void testSqlResourceLimitExceeded() throws Exception
|
||||||
|
{
|
||||||
|
sqlHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*500.*")
|
||||||
|
public void testSqlFailure() throws Exception
|
||||||
|
{
|
||||||
|
sqlHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*504.*")
|
||||||
|
public void testQueryTimeout() throws Exception
|
||||||
|
{
|
||||||
|
queryHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*429.*")
|
||||||
|
public void testQueryCapacityExceeded() throws Exception
|
||||||
|
{
|
||||||
|
queryHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*501.*")
|
||||||
|
public void testQueryUnsupported() throws Exception
|
||||||
|
{
|
||||||
|
queryHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*")
|
||||||
|
public void testResourceLimitExceeded() throws Exception
|
||||||
|
{
|
||||||
|
queryHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*500.*")
|
||||||
|
public void testQueryFailure() throws Exception
|
||||||
|
{
|
||||||
|
queryHelper.testQueriesFromString(
|
||||||
|
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String buildSqlPlanFailureQuery(String sql) throws IOException
|
||||||
|
{
|
||||||
|
return StringUtils.replace(
|
||||||
|
AbstractIndexerTest.getResourceAsString(SQL_PLAN_FAILURE_RESOURCE),
|
||||||
|
"%%QUERY%%",
|
||||||
|
sql
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String buildHistoricalErrorSqlQuery(String contextKey) throws IOException
|
||||||
|
{
|
||||||
|
return StringUtils.replace(
|
||||||
|
AbstractIndexerTest.getResourceAsString(SQL_QUERY_RESOURCE),
|
||||||
|
"%%CONTEXT%%",
|
||||||
|
jsonMapper.writeValueAsString(buildTestContext(contextKey))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String buildHistoricalErrorTestQuery(String contextKey) throws IOException
|
||||||
|
{
|
||||||
|
return StringUtils.replace(
|
||||||
|
AbstractIndexerTest.getResourceAsString(NATIVE_QUERY_RESOURCE),
|
||||||
|
"%%CONTEXT%%",
|
||||||
|
jsonMapper.writeValueAsString(buildTestContext(contextKey))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Object> buildTestContext(String key)
|
||||||
|
{
|
||||||
|
final Map<String, Object> context = new HashMap<>();
|
||||||
|
// Disable cache so that each run hits historical.
|
||||||
|
context.put(QueryContexts.USE_CACHE_KEY, false);
|
||||||
|
context.put(key, true);
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
||||||
import org.apache.druid.query.QueryContexts;
|
import org.apache.druid.query.QueryContexts;
|
||||||
import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest;
|
import org.apache.druid.server.coordination.ServerManagerForQueryErrorTest;
|
||||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||||
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||||
import org.apache.druid.testing.clients.QueryResourceTestClient;
|
import org.apache.druid.testing.clients.QueryResourceTestClient;
|
||||||
|
@ -53,7 +53,7 @@ import java.util.Map;
|
||||||
* the historical drops the segment after the broker issues the query to the historical. To mimic this case, this
|
* the historical drops the segment after the broker issues the query to the historical. To mimic this case, this
|
||||||
* test spawns two historicals, a normal historical and a historical modified for testing. The later historical
|
* test spawns two historicals, a normal historical and a historical modified for testing. The later historical
|
||||||
* announces all segments assigned, but doesn't serve all of them. Instead, it can report missing segments for some
|
* announces all segments assigned, but doesn't serve all of them. Instead, it can report missing segments for some
|
||||||
* segments. See {@link ServerManagerForQueryRetryTest} for more details.
|
* segments. See {@link ServerManagerForQueryErrorTest} for more details.
|
||||||
* <p>
|
* <p>
|
||||||
* To run this test properly, the test group must be specified as {@link TestNGGroup#QUERY_RETRY}.
|
* To run this test properly, the test group must be specified as {@link TestNGGroup#QUERY_RETRY}.
|
||||||
*/
|
*/
|
||||||
|
@ -237,7 +237,7 @@ public class ITQueryRetryTestOnMissingSegments
|
||||||
context.put(QueryContexts.USE_CACHE_KEY, false);
|
context.put(QueryContexts.USE_CACHE_KEY, false);
|
||||||
context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, numRetriesOnMissingSegments);
|
context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, numRetriesOnMissingSegments);
|
||||||
context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
|
context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
|
||||||
context.put(ServerManagerForQueryRetryTest.QUERY_RETRY_TEST_CONTEXT_KEY, true);
|
context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY, true);
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"description": "timeseries, 1 agg, all",
|
||||||
|
"query": {
|
||||||
|
"queryType": "timeseries",
|
||||||
|
"dataSource": "wikipedia_editstream",
|
||||||
|
"intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
|
||||||
|
"granularity": "all",
|
||||||
|
"aggregations": [
|
||||||
|
{
|
||||||
|
"type": "count",
|
||||||
|
"name": "rows"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"context": %%CONTEXT%%
|
||||||
|
},
|
||||||
|
"expectedResults": []
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,9 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"query": {
|
||||||
|
"query": "SELECT count(*) from wikipedia_editstream",
|
||||||
|
"context": %%CONTEXT%%
|
||||||
|
},
|
||||||
|
"expectedResults": []
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,8 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"query": {
|
||||||
|
"query": "%%QUERY%%"
|
||||||
|
},
|
||||||
|
"expectedResults": []
|
||||||
|
}
|
||||||
|
]
|
|
@ -23,8 +23,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.JavaType;
|
import com.fasterxml.jackson.databind.JavaType;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.util.concurrent.AbstractFuture;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||||
|
import org.apache.druid.query.Query;
|
||||||
import org.apache.druid.query.QueryCapacityExceededException;
|
import org.apache.druid.query.QueryCapacityExceededException;
|
||||||
import org.apache.druid.query.QueryException;
|
import org.apache.druid.query.QueryException;
|
||||||
import org.apache.druid.query.QueryInterruptedException;
|
import org.apache.druid.query.QueryInterruptedException;
|
||||||
|
@ -44,6 +48,9 @@ import org.mockito.Mockito;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@RunWith(Enclosed.class)
|
@RunWith(Enclosed.class)
|
||||||
public class JsonParserIteratorTest
|
public class JsonParserIteratorTest
|
||||||
|
@ -188,7 +195,7 @@ public class JsonParserIteratorTest
|
||||||
public ExpectedException expectedException = ExpectedException.none();
|
public ExpectedException expectedException = ExpectedException.none();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertQueryExceptionToQueryInterruptedException() throws JsonProcessingException
|
public void testConvertQueryExceptionWithNullErrorCodeToQueryInterruptedException() throws JsonProcessingException
|
||||||
{
|
{
|
||||||
JsonParserIterator<Object> iterator = new JsonParserIterator<>(
|
JsonParserIterator<Object> iterator = new JsonParserIterator<>(
|
||||||
JAVA_TYPE,
|
JAVA_TYPE,
|
||||||
|
@ -202,6 +209,108 @@ public class JsonParserIteratorTest
|
||||||
expectedException.expectMessage("query exception test");
|
expectedException.expectMessage("query exception test");
|
||||||
iterator.hasNext();
|
iterator.hasNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertQueryExceptionWithNonNullErrorCodeToQueryInterruptedException()
|
||||||
|
throws JsonProcessingException
|
||||||
|
{
|
||||||
|
JsonParserIterator<Object> iterator = new JsonParserIterator<>(
|
||||||
|
JAVA_TYPE,
|
||||||
|
Futures.immediateFuture(
|
||||||
|
mockErrorResponse(new QueryException("test error", "query exception test", null, null))
|
||||||
|
),
|
||||||
|
URL,
|
||||||
|
null,
|
||||||
|
HOST,
|
||||||
|
OBJECT_MAPPER
|
||||||
|
);
|
||||||
|
expectedException.expect(QueryInterruptedException.class);
|
||||||
|
expectedException.expectMessage("query exception test");
|
||||||
|
iterator.hasNext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TimeoutExceptionConversionTest
|
||||||
|
{
|
||||||
|
@Rule
|
||||||
|
public ExpectedException expectedException = ExpectedException.none();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeoutBeforeCallingFuture()
|
||||||
|
{
|
||||||
|
JsonParserIterator<?> iterator = new JsonParserIterator<>(
|
||||||
|
JAVA_TYPE,
|
||||||
|
Mockito.mock(Future.class),
|
||||||
|
URL,
|
||||||
|
mockQuery("qid", 0L), // should always timeout
|
||||||
|
HOST,
|
||||||
|
OBJECT_MAPPER
|
||||||
|
);
|
||||||
|
expectedException.expect(QueryTimeoutException.class);
|
||||||
|
expectedException.expectMessage(StringUtils.format("url[%s] timed out", URL));
|
||||||
|
iterator.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeoutWhileCallingFuture()
|
||||||
|
{
|
||||||
|
Future<InputStream> future = new AbstractFuture<InputStream>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public InputStream get(long timeout, TimeUnit unit)
|
||||||
|
throws InterruptedException
|
||||||
|
{
|
||||||
|
Thread.sleep(2000); // Sleep longer than timeout
|
||||||
|
return null; // should return null so that JsonParserIterator checks timeout
|
||||||
|
}
|
||||||
|
};
|
||||||
|
JsonParserIterator<?> iterator = new JsonParserIterator<>(
|
||||||
|
JAVA_TYPE,
|
||||||
|
future,
|
||||||
|
URL,
|
||||||
|
mockQuery("qid", System.currentTimeMillis() + 500L), // timeout in 500 ms
|
||||||
|
HOST,
|
||||||
|
OBJECT_MAPPER
|
||||||
|
);
|
||||||
|
expectedException.expect(QueryTimeoutException.class);
|
||||||
|
expectedException.expectMessage(StringUtils.format("url[%s] timed out", URL));
|
||||||
|
iterator.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeoutAfterCallingFuture()
|
||||||
|
{
|
||||||
|
ExecutorService service = Execs.singleThreaded("timeout-test");
|
||||||
|
try {
|
||||||
|
JsonParserIterator<?> iterator = new JsonParserIterator<>(
|
||||||
|
JAVA_TYPE,
|
||||||
|
service.submit(() -> {
|
||||||
|
Thread.sleep(2000); // Sleep longer than timeout
|
||||||
|
return null;
|
||||||
|
}),
|
||||||
|
URL,
|
||||||
|
mockQuery("qid", System.currentTimeMillis() + 500L), // timeout in 500 ms
|
||||||
|
HOST,
|
||||||
|
OBJECT_MAPPER
|
||||||
|
);
|
||||||
|
expectedException.expect(QueryTimeoutException.class);
|
||||||
|
expectedException.expectMessage("Query [qid] timed out");
|
||||||
|
iterator.hasNext();
|
||||||
|
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
service.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Query<?> mockQuery(String queryId, long timeoutAt)
|
||||||
|
{
|
||||||
|
Query<?> query = Mockito.mock(Query.class);
|
||||||
|
Mockito.when(query.getId()).thenReturn(queryId);
|
||||||
|
Mockito.when(query.getContextValue(ArgumentMatchers.eq(DirectDruidClient.QUERY_FAIL_TIME), ArgumentMatchers.eq(-1L)))
|
||||||
|
.thenReturn(timeoutAt);
|
||||||
|
return query;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static InputStream mockErrorResponse(Exception e) throws JsonProcessingException
|
private static InputStream mockErrorResponse(Exception e) throws JsonProcessingException
|
||||||
|
|
|
@ -137,7 +137,7 @@ public class CliHistorical extends ServerRunnable
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is visible for testing query retry on missing segments. See {@link CliHistoricalForQueryRetryTest}.
|
* This method is visible for testing query retry on missing segments. See {@link CliHistoricalForQueryErrorTest}.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void bindQuerySegmentWalker(Binder binder)
|
public void bindQuerySegmentWalker(Binder binder)
|
||||||
|
|
Loading…
Reference in New Issue