Add integration tests for query retry on missing segments (#10171)

* Add integration tests for query retry on missing segments

* add missing dependencies; fix travis conf

* address comments

* Integration tests extension

* remove unused dependency

* remove druid_main

* fix java agent port
This commit is contained in:
Jihoon Son 2020-07-22 22:30:35 -07:00 committed by GitHub
parent 26d099f39b
commit 6fdce36e41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 838 additions and 56 deletions

View File

@ -246,7 +246,7 @@ jobs:
<<: *test_processing_module
name: "(openjdk8) other modules test"
env:
- MAVEN_PROJECTS='!processing,!indexing-hadoop,!indexing-service,!extensions-core/kafka-indexing-service,!extensions-core/kinesis-indexing-service,!server,!web-console'
- MAVEN_PROJECTS='!processing,!indexing-hadoop,!indexing-service,!extensions-core/kafka-indexing-service,!extensions-core/kinesis-indexing-service,!server,!web-console,!integration-tests'
- <<: *test_other_modules
name: "(openjdk11) other modules test"
@ -385,6 +385,14 @@ jobs:
script: *run_integration_test
after_failure: *integration_test_diags
- &integration_query_retry
name: "(Compile=openjdk8, Run=openjdk8) query retry integration test for missing segments"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
- &integration_security
name: "(Compile=openjdk8, Run=openjdk8) security integration test"
jdk: openjdk8
@ -413,7 +421,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
script: *run_integration_test
after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8
@ -444,6 +452,11 @@ jobs:
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11'
- <<: *integration_query_retry
name: "(Compile=openjdk8, Run=openjdk11) query retry integration test for missing segments"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11'
- <<: *integration_security
name: "(Compile=openjdk8, Run=openjdk11) security integration test"
jdk: openjdk8
@ -462,7 +475,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
# END - Integration tests for Compile with Java 8 and Run with Java 11
- name: "security vulnerabilities"

View File

@ -23,6 +23,11 @@ import io.airlift.airline.Cli;
import org.apache.druid.guice.annotations.ExtensionPoint;
/**
* An extension point to create a custom Druid service. Druid can understand and execute custom commands
* to run services loaded via Druid's extension system (see {@code Initialization#getFromExtensions}). See
* the {@code Main} class for details of groups and commands.
*
* Implementations should be registered in the {@code META-INF/services/org.apache.druid.cli.CliCommandCreator} file.
*/
@ExtensionPoint
public interface CliCommandCreator

View File

@ -51,7 +51,7 @@ public class Tasks
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
/**
*This context is used in auto compaction. When it is set in the context, the segments created by the task
* This context is used in auto compaction. When it is set in the context, the segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
* See {@link org.apache.druid.timeline.DataSegment} and {@link
* org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for more details.

View File

@ -0,0 +1,128 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "2.2"
services:
druid-zookeeper-kafka:
extends:
file: docker-compose.base.yml
service: druid-zookeeper-kafka
druid-metadata-storage:
extends:
file: docker-compose.base.yml
service: druid-metadata-storage
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-zookeeper-kafka
druid-overlord:
extends:
file: docker-compose.base.yml
service: druid-overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
druid-coordinator:
extends:
file: docker-compose.base.yml
service: druid-coordinator
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-overlord:druid-overlord
- druid-metadata-storage:druid-metadata-storage
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-overlord
- druid-metadata-storage
- druid-zookeeper-kafka
druid-historical:
extends:
file: docker-compose.base.yml
service: druid-historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-zookeeper-kafka
druid-broker:
extends:
file: docker-compose.base.yml
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-historical:druid-historical
depends_on:
- druid-zookeeper-kafka
- druid-historical
druid-router:
extends:
file: docker-compose.base.yml
service: druid-router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
- druid-coordinator:druid-coordinator
- druid-broker:druid-broker
depends_on:
- druid-zookeeper-kafka
- druid-coordinator
- druid-broker
druid-historical-for-query-retry-test:
image: druid/cluster
container_name: druid-historical-for-query-retry-test
networks:
druid-it-net:
ipv4_address: 172.172.172.13
ports:
- 8084:8083
- 8284:8283
- 5010:5007
privileged: true
volumes:
- ${HOME}/shared:/shared
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
env_file:
- ./environment-configs/common
- ./environment-configs/historical-for-query-retry-test
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
links:
- druid-zookeeper-kafka:druid-zookeeper-kafka
depends_on:
- druid-zookeeper-kafka
networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24

View File

@ -23,6 +23,7 @@ getConfPath()
case "$1" in
_common) echo $cluster_conf_base/_common ;;
historical) echo $cluster_conf_base/data/historical ;;
historical-for-query-retry-test) echo $cluster_conf_base/data/historical ;;
middleManager) echo $cluster_conf_base/data/middleManager ;;
coordinator) echo $cluster_conf_base/master/coordinator ;;
broker) echo $cluster_conf_base/query/broker ;;
@ -82,14 +83,18 @@ setupData()
# The "query" and "security" test groups require data to be setup before running the tests.
# In particular, they requires segments to be download from a pre-existing s3 bucket.
# This is done by using the loadSpec put into metadatastore and s3 credientials set below.
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop
# below s3 credentials needed to access the pre-existing s3 bucket
setKey $DRUID_SERVICE druid.s3.accessKey AKIAJI7DG7CDECGBQ6NA
setKey $DRUID_SERVICE druid.s3.secretKey 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
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
if [[ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]]; then
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\",\"druid-integration-tests\"]
else
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
fi
# The region of the sample data s3 blobs needed for these test groups
export AWS_REGION=us-east-1
fi

View File

@ -0,0 +1,33 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
DRUID_SERVICE=historical-for-query-retry-test
DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log
# JAVA OPTS
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010
# Druid configs
druid_processing_buffer_sizeBytes=25000000
druid_processing_numThreads=2
druid_query_groupBy_maxOnDiskStorage=300000000
druid_segmentCache_locations=[{"path":"/shared/druid/indexCache-query-retry-test","maxSize":5000000000}]
druid_server_maxSize=5000000000
druid_auth_basic_common_cacheDirectory=/tmp/authCache/historical-query-retry-test
druid_server_https_crlPath=/tls/revocations.crl

View File

@ -0,0 +1,20 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9','twitterstream','2013-05-13T01:08:18.192Z','2013-01-01T00:00:00.000Z','2013-01-02T00:00:00.000Z',0,'2013-01-02T04:13:41.980Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z\",\"version\":\"2013-01-02T04:13:41.980Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/2013-01-02T04:13:41.980Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":445235220,\"identifier\":\"twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9\"}');
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9','twitterstream','2013-05-13T00:03:28.640Z','2013-01-02T00:00:00.000Z','2013-01-03T00:00:00.000Z',0,'2013-01-03T03:44:58.791Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z\",\"version\":\"2013-01-03T03:44:58.791Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z/2013-01-03T03:44:58.791Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":435325540,\"identifier\":\"twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9\"}');
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9','twitterstream','2013-05-13T00:03:48.807Z','2013-01-03T00:00:00.000Z','2013-01-04T00:00:00.000Z',0,'2013-01-04T04:09:13.590Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z\",\"version\":\"2013-01-04T04:09:13.590Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z/2013-01-04T04:09:13.590Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":411651320,\"identifier\":\"twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9\"}');
INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','wikipedia_editstream','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"wikipedia_editstream\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia_editstream/2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z/2013-01-10T08:13:47.830Z_v9/0/index.zip\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"}');
INSERT INTO druid_segments (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES ('wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z', 'wikipedia', '2013-08-08T21:26:23.799Z', '2013-08-01T00:00:00.000Z', '2013-08-02T00:00:00.000Z', '0', '2013-08-08T21:22:48.989Z', '1', '{\"dataSource\":\"wikipedia\",\"interval\":\"2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z\",\"version\":\"2013-08-08T21:22:48.989Z\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia/20130801T000000.000Z_20130802T000000.000Z/2013-08-08T21_22_48.989Z/0/index.zip\"},\"dimensions\":\"dma_code,continent_code,geo,area_code,robot,country_name,network,city,namespace,anonymous,unpatrolled,page,postal_code,language,newpage,user,region_lookup\",\"metrics\":\"count,delta,variation,added,deleted\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":24664730,\"identifier\":\"wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z\"}');

View File

@ -45,6 +45,14 @@
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
@ -195,7 +203,6 @@
<groupId>org.apache.druid</groupId>
<artifactId>druid-services</artifactId>
<version>${project.parent.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>

View File

@ -32,7 +32,17 @@ rm -rf $SHARED_DIR/docker
cp -R docker $SHARED_DIR/docker
mvn -B dependency:copy-dependencies -DoutputDirectory=$SHARED_DIR/docker/lib
# install logging config
cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml
# copy the integration test jar, it provides test-only extension implementations
cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
# move extensions into a seperate extension folder
# For druid-integration-tests
mkdir -p $SHARED_DIR/docker/extensions/druid-integration-tests
# We don't want to copy tests jar.
cp $SHARED_DIR/docker/lib/druid-integration-tests-*[^s].jar $SHARED_DIR/docker/extensions/druid-integration-tests
# For druid-s3-extensions
mkdir -p $SHARED_DIR/docker/extensions/druid-s3-extensions
mv $SHARED_DIR/docker/lib/druid-s3-extensions-* $SHARED_DIR/docker/extensions/druid-s3-extensions
@ -64,12 +74,6 @@ then
curl https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar --output $SHARED_DIR/docker/lib/gcs-connector-hadoop2-latest.jar
fi
# install logging config
cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml
# copy the integration test jar, it provides test-only extension implementations
cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
# one of the integration tests needs the wikiticker sample data
mkdir -p $SHARED_DIR/wikiticker-it
cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz

View File

@ -49,11 +49,16 @@ fi
then
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
then
# Start default Druid services and additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
docker-compose -f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.security.yml up -d
# Start default Druid services and additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
docker-compose -f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.security.yml up -d
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
then
# Start default Druid services with an additional historical modified for query retry test
# See CliHistoricalForQueryRetryTest.
docker-compose -f ${DOCKERDIR}/docker-compose.query-retry-test.yml up -d
else
# Start default Druid services
docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
# Start default Druid services
docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
fi
else
# run druid cluster with override config

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.google.inject.Binder;
import com.google.inject.Inject;
import io.airlift.airline.Command;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest;
import java.util.Properties;
@Command(
name = "historical-for-query-retry-test",
description = "Runs a Historical node modified for query retry test"
)
public class CliHistoricalForQueryRetryTest extends CliHistorical
{
private static final Logger log = new Logger(CliHistoricalForQueryRetryTest.class);
public CliHistoricalForQueryRetryTest()
{
super();
}
@Inject
public void configure(Properties properties)
{
log.info("Historical is configured for testing query retry on missing segments");
}
@Override
public void bindQuerySegmentWalker(Binder binder)
{
binder.bind(QuerySegmentWalker.class).to(ServerManagerForQueryRetryTest.class).in(LazySingleton.class);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import io.airlift.airline.Cli.CliBuilder;
public class QueryRetryTestCommandCreator implements CliCommandCreator
{
@Override
public void addCommands(CliBuilder builder)
{
builder.withGroup("server").withCommands(CliHistoricalForQueryRetryTest.class);
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* This server manager is designed to test query retry on missing segments. A segment can be missing during a query
* if a historical drops the segment after the broker issues the query to the historical. To mimic this situation,
* the historical with this server manager announces all segments assigned, but reports missing segments for the
* first 3 segments specified in the query.
*
* @see org.apache.druid.query.RetryQueryRunner
*/
public class ServerManagerForQueryRetryTest extends ServerManager
{
// Query context key that indicates this query is for query retry testing.
public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test";
private static final Logger LOG = new Logger(ServerManagerForQueryRetryTest.class);
private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 3;
private final ConcurrentHashMap<String, Set<SegmentDescriptor>> queryToIgnoredSegments = new ConcurrentHashMap<>();
@Inject
public ServerManagerForQueryRetryTest(
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
@Processing ExecutorService exec,
CachePopulator cachePopulator,
@Smile ObjectMapper objectMapper,
Cache cache,
CacheConfig cacheConfig,
SegmentManager segmentManager,
JoinableFactory joinableFactory,
ServerConfig serverConfig
)
{
super(
conglomerate,
emitter,
exec,
cachePopulator,
objectMapper,
cache,
cacheConfig,
segmentManager,
joinableFactory,
serverConfig
);
}
@Override
<T> QueryRunner<T> buildQueryRunnerForSegment(
Query<T> query,
SegmentDescriptor descriptor,
QueryRunnerFactory<T, Query<T>> factory,
QueryToolChest<T, Query<T>> toolChest,
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
Function<SegmentReference, SegmentReference> segmentMapFn,
AtomicLong cpuTimeAccumulator
)
{
if (query.getContextBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
queryToIgnoredSegments.compute(
query.getMostSpecificId(),
(queryId, ignoredSegments) -> {
if (ignoredSegments == null) {
ignoredSegments = new HashSet<>();
}
if (ignoredSegments.size() < MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
ignoredSegments.add(descriptor);
isIgnoreSegment.setTrue();
}
return ignoredSegments;
}
);
if (isIgnoreSegment.isTrue()) {
LOG.info("Pretending I don't have segment[%s]", descriptor);
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
}
return super.buildQueryRunnerForSegment(
query,
descriptor,
factory,
toolChest,
timeline,
segmentMapFn,
cpuTimeAccumulator
);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.Query;
import java.util.List;
@ -27,14 +28,12 @@ import java.util.Map;
public class QueryWithResults extends AbstractQueryWithResults<Query>
{
@JsonCreator
public QueryWithResults(
Query query,
List<Map<String, Object>> expectedResults
@JsonProperty("query") Query query,
@JsonProperty("expectedResults") List<Map<String, Object>> expectedResults
)
{
super(query, expectedResults);
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.druid.cli.QueryRetryTestCommandCreator

View File

@ -52,6 +52,8 @@ public class TestNGGroup
*/
public static final String QUERY = "query";
public static final String QUERY_RETRY = "query-retry";
public static final String REALTIME_INDEX = "realtime-index";
/**

View File

@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.QueryResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.QueryResultVerifier;
import org.apache.druid.testing.utils.QueryWithResults;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class tests the query retry on missing segments. A segment can be missing in a historical during a query if
* the historical drops the segment after the broker issues the query to the historical. To mimic this case, this
* test spawns two historicals, a normal historical and a historical modified for testing. The later historical
* announces all segments assigned, but doesn't serve all of them. Instead, it can report missing segments for some
* segments. See {@link ServerManagerForQueryRetryTest} for more details.
*
* To run this test properly, the test group must be specified as {@link TestNGGroup#QUERY_RETRY}.
*/
@Test(groups = TestNGGroup.QUERY_RETRY)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITQueryRetryTestOnMissingSegments
{
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
private static final String QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries_query_retry_test.json";
private static final int TIMES_TO_RUN = 50;
/**
* This test runs the same query multiple times. This enumeration represents an expectation after finishing
* running the query.
*/
private enum Expectation
{
/**
* Expect that all runs succeed.
*/
ALL_SUCCESS,
/**
* Expect that all runs returns the 200 HTTP response, but some of them can return incorrect result.
*/
INCORRECT_RESULT,
/**
* Expect that some runs can return the 500 HTTP response. For the runs returned the 200 HTTP response, the query
* result must be correct.
*/
QUERY_FAILURE
}
@Inject
private CoordinatorResourceTestClient coordinatorClient;
@Inject
private TestQueryHelper queryHelper;
@Inject
private QueryResourceTestClient queryClient;
@Inject
private IntegrationTestingConfig config;
@Inject
private ObjectMapper jsonMapper;
@BeforeMethod
public void before()
{
// ensure that wikipedia segments are loaded completely
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
}
@Test
public void testWithRetriesDisabledPartialResultDisallowed() throws Exception
{
// Since retry is disabled and partial result is not allowed, we can expect some queries can fail.
// If a query succeed, its result must be correct.
testQueries(buildQuery(0, false), Expectation.QUERY_FAILURE);
}
@Test
public void testWithRetriesDisabledPartialResultAllowed() throws Exception
{
// Since retry is disabled but partial result is allowed, all queries must succeed.
// However, some queries can return incorrect result.
testQueries(buildQuery(0, true), Expectation.INCORRECT_RESULT);
}
@Test
public void testWithRetriesEnabledPartialResultDisallowed() throws Exception
{
// Since retry is enabled, all queries must succeed even though partial result is disallowed.
// All queries must return correct result.
testQueries(buildQuery(30, false), Expectation.ALL_SUCCESS);
}
private void testQueries(String queryWithResultsStr, Expectation expectation) throws Exception
{
final List<QueryWithResults> queries = jsonMapper.readValue(
queryWithResultsStr,
new TypeReference<List<QueryWithResults>>() {}
);
testQueries(queries, expectation);
}
private void testQueries(List<QueryWithResults> queries, Expectation expectation) throws Exception
{
int querySuccess = 0;
int queryFailure = 0;
int resultMatches = 0;
int resultMismatches = 0;
for (int i = 0; i < TIMES_TO_RUN; i++) {
for (QueryWithResults queryWithResult : queries) {
final StatusResponseHolder responseHolder = queryClient
.queryAsync(queryHelper.getQueryURL(config.getBrokerUrl()), queryWithResult.getQuery())
.get();
if (responseHolder.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
querySuccess++;
List<Map<String, Object>> result = jsonMapper.readValue(
responseHolder.getContent(),
new TypeReference<List<Map<String, Object>>>() {}
);
if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults())) {
if (expectation != Expectation.INCORRECT_RESULT) {
throw new ISE(
"Incorrect query results for query %s \n expectedResults: %s \n actualResults : %s",
queryWithResult.getQuery(),
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
jsonMapper.writeValueAsString(result)
);
} else {
resultMismatches++;
}
} else {
resultMatches++;
}
} else if (responseHolder.getStatus().getCode() == HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode() &&
expectation == Expectation.QUERY_FAILURE) {
final Map<String, Object> response = jsonMapper.readValue(responseHolder.getContent(), Map.class);
final String errorMessage = (String) response.get("errorMessage");
Assert.assertNotNull(errorMessage, "errorMessage");
Assert.assertTrue(errorMessage.contains("No results found for segments"));
queryFailure++;
} else {
throw new ISE(
"Unexpected failure, code: [%s], content: [%s]",
responseHolder.getStatus(),
responseHolder.getContent()
);
}
}
}
switch (expectation) {
case ALL_SUCCESS:
Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, querySuccess);
Assert.assertEquals(0, queryFailure);
Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, resultMatches);
Assert.assertEquals(0, resultMismatches);
break;
case QUERY_FAILURE:
Assert.assertTrue(querySuccess > 0, "At least one query is expected to succeed.");
Assert.assertTrue(queryFailure > 0, "At least one query is expected to fail.");
Assert.assertEquals(querySuccess, resultMatches);
Assert.assertEquals(0, resultMismatches);
break;
case INCORRECT_RESULT:
Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, querySuccess);
Assert.assertEquals(0, queryFailure);
Assert.assertTrue(resultMatches > 0, "At least one query is expected to return correct results.");
Assert.assertTrue(resultMismatches > 0, "At least one query is expected to return less results.");
break;
default:
throw new ISE("Unknown expectation[%s]", expectation);
}
}
private String buildQuery(int numRetriesOnMissingSegments, boolean allowPartialResults) throws IOException
{
return StringUtils.replace(
AbstractIndexerTest.getResourceAsString(QUERIES_RESOURCE),
"%%CONTEXT%%",
jsonMapper.writeValueAsString(buildContext(numRetriesOnMissingSegments, allowPartialResults))
);
}
private static Map<String, Object> buildContext(int numRetriesOnMissingSegments, boolean allowPartialResults)
{
final Map<String, Object> context = new HashMap<>();
// Disable cache so that each run hits historical.
context.put(QueryContexts.USE_CACHE_KEY, false);
context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, numRetriesOnMissingSegments);
context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
context.put(ServerManagerForQueryRetryTest.QUERY_RETRY_TEST_CONTEXT_KEY, true);
return context;
}
}

View File

@ -64,7 +64,6 @@ public class ITWikipediaQueryTest
@BeforeMethod
public void before() throws Exception
{
// ensure that wikipedia segments are loaded completely
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"

View File

@ -0,0 +1,26 @@
[
{
"description": "timeseries, 1 agg, all",
"query": {
"queryType": "timeseries",
"dataSource": "wikipedia_editstream",
"intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
"granularity": "all",
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"context": %%CONTEXT%%
},
"expectedResults": [
{
"timestamp": "2013-01-01T00:00:00.000Z",
"result": {
"rows": 2390950
}
}
]
}
]

View File

@ -20,7 +20,7 @@ if [ -n "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" ] && [ "$DRUID_INTEGRATION_TES
exit 0
fi
for node in druid-historical druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
for node in druid-historical druid-historical-for-query-retry-test druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
do
docker stop $node
docker rm $node

View File

@ -53,6 +53,9 @@ public class QueryContexts
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";
public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
public static final String NUM_RETRIES_ON_MISSING_SEGMENTS_KEY = "numRetriesOnMissingSegments";
public static final String RETURN_PARTIAL_RESULTS_KEY = "returnPartialResults";
public static final String USE_CACHE_KEY = "useCache";
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
@ -143,7 +146,7 @@ public class QueryContexts
public static <T> boolean isUseCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "useCache", defaultValue);
return parseBoolean(query, USE_CACHE_KEY, defaultValue);
}
public static <T> boolean isPopulateResultLevelCache(Query<T> query)
@ -344,6 +347,16 @@ public class QueryContexts
return defaultTimeout;
}
public static <T> int getNumRetriesOnMissingSegments(Query<T> query, int defaultValue)
{
return query.getContextValue(NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, defaultValue);
}
public static <T> boolean allowReturnPartialResults(Query<T> query, boolean defaultValue)
{
return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue);
}
static <T> long parseLong(Query<T> query, String key, long defaultValue)
{
final Object val = query.getContextValue(key);

View File

@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
@ -138,8 +139,17 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
final Sequence<Result<TimeseriesResultValue>> finalSequence;
if (query.getGranularity().equals(Granularities.ALL) && !query.isSkipEmptyBuckets()) {
//Usally it is NOT Okay to materialize results via toList(), but Granularity is ALL thus we have only one record
// When granularity = ALL, there is no grouping key for this query.
// To be more sql-compliant, we should return something (e.g., 0 for count queries) even when
// the sequence is empty.
if (query.getGranularity().equals(Granularities.ALL) &&
// Returns empty sequence if this query allows skipping empty buckets
!query.isSkipEmptyBuckets() &&
// Returns empty sequence if bySegment is set because bySegment results are mostly used for
// caching in historicals or debugging where the exact results are preferred.
!QueryContexts.isBySegment(query)) {
// Usally it is NOT Okay to materialize results via toList(), but Granularity is ALL thus
// we have only one record.
final List<Result<TimeseriesResultValue>> val = baseResults.toList();
finalSequence = val.isEmpty() ? Sequences.simple(Collections.singletonList(
getNullTimeseriesResultValue(query))) : Sequences.simple(val);

View File

@ -214,10 +214,14 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
return true;
} else {
final List<SegmentDescriptor> missingSegments = getMissingSegments(queryPlus, context);
final int maxNumRetries = QueryContexts.getNumRetriesOnMissingSegments(
queryPlus.getQuery(),
config.getNumTries()
);
if (missingSegments.isEmpty()) {
return false;
} else if (retryCount >= config.getNumTries()) {
if (!config.isReturnPartialResults()) {
} else if (retryCount >= maxNumRetries) {
if (!QueryContexts.allowReturnPartialResults(queryPlus.getQuery(), config.isReturnPartialResults())) {
throw new SegmentMissingException("No results found for segments[%s]", missingSegments);
} else {
return false;

View File

@ -204,35 +204,20 @@ public class ServerManager implements QuerySegmentWalker
analysis.getBaseQuery().orElse(query)
);
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
final FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(specs)
.transformCat(
descriptor -> {
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
if (entry == null) {
return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor));
}
final PartitionChunk<ReferenceCountingSegment> chunk = entry.getChunk(descriptor.getPartitionNumber());
if (chunk == null) {
return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor));
}
final ReferenceCountingSegment segment = chunk.getObject();
return Collections.singletonList(
buildAndDecorateQueryRunner(
factory,
toolChest,
segmentMapFn.apply(segment),
descriptor,
cpuTimeAccumulator
)
);
}
descriptor -> Collections.singletonList(
buildQueryRunnerForSegment(
query,
descriptor,
factory,
toolChest,
timeline,
segmentMapFn,
cpuTimeAccumulator
)
)
);
return CPUTimeMetricQueryRunner.safeBuild(
@ -247,6 +232,40 @@ public class ServerManager implements QuerySegmentWalker
);
}
<T> QueryRunner<T> buildQueryRunnerForSegment(
final Query<T> query,
final SegmentDescriptor descriptor,
final QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest,
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
final Function<SegmentReference, SegmentReference> segmentMapFn,
final AtomicLong cpuTimeAccumulator
)
{
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
if (entry == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<ReferenceCountingSegment> chunk = entry.getChunk(descriptor.getPartitionNumber());
if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final ReferenceCountingSegment segment = chunk.getObject();
return buildAndDecorateQueryRunner(
factory,
toolChest,
segmentMapFn.apply(segment),
descriptor,
cpuTimeAccumulator
);
}
private <T> QueryRunner<T> buildAndDecorateQueryRunner(
final QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest,

View File

@ -19,7 +19,9 @@
package org.apache.druid.cli;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
@ -91,7 +93,7 @@ public class CliHistorical extends ServerRunnable
binder.bind(ServerManager.class).in(LazySingleton.class);
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
bindQuerySegmentWalker(binder);
binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.HISTORICAL));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
@ -119,4 +121,13 @@ public class CliHistorical extends ServerRunnable
new LookupModule()
);
}
/**
* This method is visible for testing query retry on missing segments. See {@link CliHistoricalForQueryRetryTest}.
*/
@VisibleForTesting
public void bindQuerySegmentWalker(Binder binder)
{
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
}
}