diff --git a/ci/travis_script_integration_part2.sh b/ci/travis_script_integration_part2.sh index 61b3b9b7b6f..24cd7970ece 100755 --- a/ci/travis_script_integration_part2.sh +++ b/ci/travis_script_integration_part2.sh @@ -21,6 +21,6 @@ set -e pushd $TRAVIS_BUILD_DIR/integration-tests -mvn verify -P integration-tests -Dit.test=ITUnionQueryTest,ITTwitterQueryTest,ITWikipediaQueryTest,ITBasicAuthConfigurationTest,ITTLSTest +mvn verify -P integration-tests -Dit.test=ITUnionQueryTest,ITNestedQueryPushDownTest,ITTwitterQueryTest,ITWikipediaQueryTest,ITBasicAuthConfigurationTest,ITTLSTest popd diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index 85f71d8a7a7..2c72e0e1725 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import javax.annotation.Nullable; import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -153,10 +154,39 @@ public class StringUtils return s.toUpperCase(Locale.ENGLISH); } + /** + * Encodes a String in application/x-www-form-urlencoded format, with one exception: + * "+" in the encoded form is replaced with "%20". + * + * application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well. + * + * @param s String to be encoded + * @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20". + */ + @Nullable public static String urlEncode(String s) { + if (s == null) { + return null; + } + try { - return URLEncoder.encode(s, "UTF-8"); + return StringUtils.replace(URLEncoder.encode(s, "UTF-8"), "+", "%20"); + } + catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + @Nullable + public static String urlDecode(String s) + { + if (s == null) { + return null; + } + + try { + return URLDecoder.decode(s, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index 53f4942cf24..69d209a7b69 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -148,4 +148,16 @@ public class StringUtilsTest Assert.assertEquals("bb", StringUtils.replace("aaaa", "aa", "b")); Assert.assertEquals("", StringUtils.replace("aaaa", "aa", "")); } + + @Test + public void testURLEncodeSpace() + { + String s1 = StringUtils.urlEncode("aaa bbb"); + Assert.assertEquals(s1, "aaa%20bbb"); + Assert.assertEquals("aaa bbb", StringUtils.urlDecode(s1)); + + String s2 = StringUtils.urlEncode("fff+ggg"); + Assert.assertEquals(s2, "fff%2Bggg"); + Assert.assertEquals("fff+ggg", StringUtils.urlDecode(s2)); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java index 1b65ad490c2..483795fc777 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java @@ -291,7 +291,7 @@ public abstract class IndexTaskClient implements AutoCloseable final Request request = new Request(method, serviceUrl); // used to validate that we are talking to the correct worker - request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId); + request.addHeader(ChatHandlerResource.TASK_ID_HEADER, StringUtils.urlEncode(taskId)); if (content.length > 0) { request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content); } @@ -369,7 +369,9 @@ public abstract class IndexTaskClient implements AutoCloseable final Duration delay; if (response != null && response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) { - String headerId = response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER); + String headerId = StringUtils.urlDecode( + response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER) + ); if (headerId != null && !headerId.equals(taskId)) { log.warn( "Expected worker to have taskId [%s] but has taskId [%s], will retry in [%d]s", diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java index 8109961558c..af1f8222d1f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java @@ -64,7 +64,7 @@ public class TaskResourceFilter extends AbstractResourceFilter @Override public ContainerRequest filter(ContainerRequest request) { - final String taskId = Preconditions.checkNotNull( + String taskId = Preconditions.checkNotNull( request.getPathSegments() .get( Iterables.indexOf( @@ -80,6 +80,7 @@ public class TaskResourceFilter extends AbstractResourceFilter ) + 1 ).getPath() ); + taskId = StringUtils.urlDecode(taskId); Optional taskOptional = taskStorageQueryAdapter.getTask(taskId); if (!taskOptional.isPresent()) { diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index bda6094e63f..db64b4f920b 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -123,7 +123,7 @@ $(document).ready(function() { 'suspended' : 'running'; data[i] = { - "dataSource" : supervisorId, + "dataSource" : dataList[i].id, "more" : 'payload' + 'status' + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java index 7c02946145e..b21a98fda4c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java @@ -35,8 +35,8 @@ public class TaskRunnerUtilsTest "/druid/worker/v1/task/%s/log", "foo bar&" ); - Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo+bar%26/log", url.toString()); + Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo%20bar%26/log", url.toString()); Assert.assertEquals("1.2.3.4:8290", url.getAuthority()); - Assert.assertEquals("/druid/worker/v1/task/foo+bar%26/log", url.getPath()); + Assert.assertEquals("/druid/worker/v1/task/foo%20bar%26/log", url.getPath()); } } diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index 2b46de7f06f..e3f3155e5f1 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -16,10 +16,12 @@ # Base image is built from integration-tests/docker-base in the Druid repo FROM imply/druiditbase +RUN echo "[mysqld]\ncharacter-set-server=utf8\ncollation-server=utf8_bin\n" >> /etc/mysql/my.cnf + # Setup metadata store # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. RUN find /var/lib/mysql -type f -exec touch {} \; && /etc/init.d/mysql start \ - && echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid DEFAULT CHARACTER SET utf8;" | mysql -u root \ + && echo "CREATE USER 'druid'@'%' IDENTIFIED BY 'diurd'; GRANT ALL ON druid.* TO 'druid'@'%'; CREATE database druid DEFAULT CHARACTER SET utf8mb4;" | mysql -u root \ && /etc/init.d/mysql stop # Add Druid jars diff --git a/integration-tests/docker/middlemanager.conf b/integration-tests/docker/middlemanager.conf index 173829d135d..40adf19339b 100644 --- a/integration-tests/docker/middlemanager.conf +++ b/integration-tests/docker/middlemanager.conf @@ -14,7 +14,7 @@ command=java -Ddruid.worker.capacity=3 -Ddruid.indexer.logs.directory=/shared/tasklogs -Ddruid.storage.storageDirectory=/shared/storage - -Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml" + -Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml" -Ddruid.indexer.fork.property.druid.processing.buffer.sizeBytes=25000000 -Ddruid.indexer.fork.property.druid.processing.numThreads=1 -Ddruid.indexer.fork.server.http.numThreads=100 diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 653beba5b9c..49a61583d15 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -231,6 +231,7 @@ -Dfile.encoding=UTF-8 -Ddruid.test.config.dockerIp=${env.DOCKER_IP} -Ddruid.test.config.hadoopDir=${env.HADOOP_DIR} + -Ddruid.test.config.extraDatasourceNameSuffix=\ Россия\ 한국\ 中国!? -Ddruid.zk.service.host=${env.DOCKER_IP} -Ddruid.client.https.trustStorePath=client_tls/truststore.jks -Ddruid.client.https.trustStorePassword=druid123 diff --git a/integration-tests/run_cluster.sh b/integration-tests/run_cluster.sh index c08867ab59c..49d046ac6b9 100755 --- a/integration-tests/run_cluster.sh +++ b/integration-tests/run_cluster.sh @@ -53,40 +53,44 @@ cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml # copy the integration test jar, it provides test-only extension implementations cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib +# one of the integration tests needs the wikiticker sample data +mkdir -p $SHARED_DIR/wikiticker-it +cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz + docker network create --subnet=172.172.172.0/24 druid-it-net # Build Druid Cluster Image docker build -t druid/cluster $SHARED_DIR/docker # Start zookeeper and kafka -docker run -d --privileged --net druid-it-net --ip 172.172.172.2 --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.2 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster # Start MYSQL -docker run -d --privileged --net druid-it-net --ip 172.172.172.3 --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.3 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster # Start Overlord -docker run -d --privileged --net druid-it-net --ip 172.172.172.4 --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.4 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Coordinator -docker run -d --privileged --net druid-it-net --ip 172.172.172.5 --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.5 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Historical -docker run -d --privileged --net druid-it-net --ip 172.172.172.6 --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.6 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Middlemanger -docker run -d --privileged --net druid-it-net --ip 172.172.172.7 --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.7 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster # Start Broker -docker run -d --privileged --net druid-it-net --ip 172.172.172.8 --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.8 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster # Start Router -docker run -d --privileged --net druid-it-net --ip 172.172.172.9 --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.9 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with permissive TLS settings (client auth enabled, no hostname verification, no revocation check) -docker run -d --privileged --net druid-it-net --ip 172.172.172.10 --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-permissive-tls.conf:$SUPERVISORDIR/router-permissive-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.10 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-permissive-tls.conf:$SUPERVISORDIR/router-permissive-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with TLS but no client auth -docker run -d --privileged --net druid-it-net --ip 172.172.172.11 --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-no-client-auth-tls.conf:$SUPERVISORDIR/router-no-client-auth-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.11 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-no-client-auth-tls.conf:$SUPERVISORDIR/router-no-client-auth-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with custom TLS cert checkers -docker run -d --privileged --net druid-it-net --ip 172.172.172.12 --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-custom-check-tls.conf:$SUPERVISORDIR/router-custom-check-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.12 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-custom-check-tls.conf:$SUPERVISORDIR/router-custom-check-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java index 990b9180576..976eb894f99 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java @@ -348,6 +348,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide { return Boolean.valueOf(props.getOrDefault("manageKafkaTopic", "true")); } + + @Override + public String getExtraDatasourceNameSuffix() + { + return ""; + } }; } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java index 04d512a15cd..7cd8d9363b4 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java @@ -37,6 +37,9 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider @NotNull private String hadoopDir; + @JsonProperty + private String extraDatasourceNameSuffix = ""; + @Override public IntegrationTestingConfig get() { @@ -202,6 +205,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider { return true; } + + @Override + public String getExtraDatasourceNameSuffix() + { + return extraDatasourceNameSuffix; + } }; } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java index ec321a035f5..f4e745fca94 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java @@ -82,4 +82,6 @@ public interface IntegrationTestingConfig Map getProperties(); boolean manageKafkaTopic(); + + String getExtraDatasourceNameSuffix(); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java index 63858d3eb35..8c1b5d173a7 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java @@ -72,7 +72,12 @@ public class ClientInfoResourceTestClient StatusResponseHolder response = httpClient.go( new Request( HttpMethod.GET, - new URL(StringUtils.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval)) + new URL(StringUtils.format( + "%s/%s/dimensions?interval=%s", + getBrokerURL(), + StringUtils.urlEncode(dataSource), + interval + )) ), responseHandler ).get(); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index c3145aae824..df6ad20d34d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -72,12 +72,12 @@ public class CoordinatorResourceTestClient private String getMetadataSegmentsURL(String dataSource) { - return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), dataSource); + return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } private String getIntervalsURL(String dataSource) { - return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), dataSource); + return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } private String getLoadStatusURL() @@ -150,7 +150,7 @@ public class CoordinatorResourceTestClient public void unloadSegmentsForDataSource(String dataSource) { try { - makeRequest(HttpMethod.DELETE, StringUtils.format("%sdatasources/%s", getCoordinatorURL(), dataSource)); + makeRequest(HttpMethod.DELETE, StringUtils.format("%sdatasources/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource))); } catch (Exception e) { throw Throwables.propagate(e); @@ -165,7 +165,7 @@ public class CoordinatorResourceTestClient StringUtils.format( "%sdatasources/%s/intervals/%s", getCoordinatorURL(), - dataSource, + StringUtils.urlEncode(dataSource), interval.toString().replace('/', '_') ) ); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 45f55d6f165..5cee2fb2886 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -42,7 +42,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.net.URL; -import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; @@ -121,7 +120,7 @@ public class OverlordResourceTestClient StringUtils.format( "%stask/%s/status", getIndexerURL(), - URLEncoder.encode(taskID, "UTF-8") + StringUtils.urlEncode(taskID) ) ); @@ -234,7 +233,7 @@ public class OverlordResourceTestClient { try { StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), id))), + new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))), responseHandler ).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java index 649557d043a..888979d3dee 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java @@ -45,6 +45,7 @@ public class TestQueryHelper private final String brokerTLS; private final String router; private final String routerTLS; + private final IntegrationTestingConfig config; @Inject TestQueryHelper( @@ -59,6 +60,7 @@ public class TestQueryHelper this.brokerTLS = config.getBrokerTLSUrl(); this.router = config.getRouterUrl(); this.routerTLS = config.getRouterTLSUrl(); + this.config = config; } public void testQueriesFromFile(String filePath, int timesToRun) throws Exception diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index f417c149d06..b9bac8b245c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -20,6 +20,9 @@ package org.apache.druid.tests.indexer; import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.ClientInfoResourceTestClient; @@ -27,6 +30,7 @@ import org.apache.druid.testing.utils.RetryUtil; import org.junit.Assert; import java.io.IOException; +import java.io.InputStream; import java.util.List; public class AbstractITBatchIndexTest extends AbstractIndexerTest @@ -45,9 +49,31 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest String queryFilePath ) throws IOException { - submitTaskAndWait(indexTaskFilePath, dataSource); + final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix(); + final String taskSpec = StringUtils.replace( + getTaskAsString(indexTaskFilePath), + "%%DATASOURCE%%", + fullDatasourceName + ); + + submitTaskAndWait(taskSpec, fullDatasourceName); try { - queryHelper.testQueriesFromFile(queryFilePath, 2); + + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + queryHelper.testQueriesFromString(queryResponseTemplate, 2); } catch (Exception e) { @@ -57,17 +83,48 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest } void doReindexTest( + String baseDataSource, String reindexDataSource, String reindexTaskFilePath, String queryFilePath ) throws IOException { - submitTaskAndWait(reindexTaskFilePath, reindexDataSource); + final String fullBaseDatasourceName = baseDataSource + config.getExtraDatasourceNameSuffix(); + final String fullReindexDatasourceName = reindexDataSource + config.getExtraDatasourceNameSuffix(); + + String taskSpec = StringUtils.replace( + getTaskAsString(reindexTaskFilePath), + "%%DATASOURCE%%", + fullBaseDatasourceName + ); + + taskSpec = StringUtils.replace( + taskSpec, + "%%REINDEX_DATASOURCE%%", + fullReindexDatasourceName + ); + + submitTaskAndWait(taskSpec, fullReindexDatasourceName); try { - queryHelper.testQueriesFromFile(queryFilePath, 2); + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullBaseDatasourceName + ); + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); // verify excluded dimension is not reIndexed final List dimensions = clientInfoResourceTestClient.getDimensions( - reindexDataSource, + fullReindexDatasourceName, "2013-08-31T00:00:00.000Z/2013-09-10T00:00:00.000Z" ); Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot")); @@ -78,9 +135,9 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest } } - private void submitTaskAndWait(String indexTaskFilePath, String dataSourceName) throws IOException + private void submitTaskAndWait(String taskSpec, String dataSourceName) { - final String taskID = indexer.submitTask(getTaskAsString(indexTaskFilePath)); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java index 176ea112590..cdf61de8c3b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -79,15 +79,21 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes @Inject IntegrationTestingConfig config; + private String fullDatasourceName; + void doTest() { + fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix(); + LOG.info("Starting test: ITRealtimeIndexTaskTest"); - try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { + try (final Closeable closeable = unloader(fullDatasourceName)) { // the task will run for 3 minutes and then shutdown itself String task = setShutOffTime( getTaskAsString(getTaskResource()), DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)) ); + task = StringUtils.replace(task, "%%DATASOURCE%%", fullDatasourceName); + LOG.info("indexerSpec: [%s]\n", task); taskID = indexer.submitTask(task); @@ -119,6 +125,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes queryStr = StringUtils.replace(queryStr, "%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2))); String postAgResponseTimestamp = TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0)); queryStr = StringUtils.replace(queryStr, "%%POST_AG_RESPONSE_TIMESTAMP%%", postAgResponseTimestamp); + queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); // should hit the queries all on realtime task or some on realtime task // and some on historical. Which it is depends on where in the minute we were @@ -140,7 +147,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes @Override public Boolean call() { - return coordinator.areSegmentsLoaded(INDEX_DATASOURCE); + return coordinator.areSegmentsLoaded(fullDatasourceName); } }, true, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index 7f7819265d5..b739d79dddd 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.clients.OverlordResourceTestClient; import org.apache.druid.testing.utils.RetryUtil; @@ -54,6 +55,9 @@ public abstract class AbstractIndexerTest @Inject protected TestQueryHelper queryHelper; + @Inject + private IntegrationTestingConfig config; + protected Closeable unloader(final String dataSource) { return () -> unloadAndKillData(dataSource); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 5ae32fa2b37..db6ebff48ab 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -19,15 +19,21 @@ package org.apache.druid.tests.indexer; +import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.RetryUtil; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; import java.util.List; @Guice(moduleFactory = DruidTestModuleFactory.class) @@ -39,23 +45,49 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private static String INDEX_DATASOURCE = "wikipedia_index_test"; private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; + @Inject + private IntegrationTestingConfig config; + + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix(); + } + @Test public void testCompactionWithoutKeepSegmentGranularity() throws Exception { loadData(); - final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsBeforeCompaction.sort(null); final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z"; if (intervalsBeforeCompaction.contains(compactedInterval)) { throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval); } - try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { - queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + try (final Closeable closeable = unloader(fullDatasourceName)) { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); compactData(false); // 4 segments across 2 days, compacted into 1 new segment (5 total) checkCompactionFinished(5); - queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + queryHelper.testQueriesFromString(queryResponseTemplate, 2); intervalsBeforeCompaction.add(compactedInterval); intervalsBeforeCompaction.sort(null); @@ -67,15 +99,31 @@ public class ITCompactionTaskTest extends AbstractIndexerTest public void testCompactionWithKeepSegmentGranularity() throws Exception { loadData(); - final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsBeforeCompaction.sort(null); - try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { - queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + try (final Closeable closeable = unloader(fullDatasourceName)) { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); compactData(true); // 4 segments across 2 days, compacted into 2 new segments (6 total) checkCompactionFinished(6); - queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + queryHelper.testQueriesFromString(queryResponseTemplate, 2); checkCompactionIntervals(intervalsBeforeCompaction); } @@ -83,12 +131,14 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private void loadData() throws Exception { - final String taskID = indexer.submitTask(getTaskAsString(INDEX_TASK)); + String taskSpec = getTaskAsString(INDEX_TASK); + taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); RetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE), + () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load" ); } @@ -96,14 +146,16 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private void compactData(boolean keepSegmentGranularity) throws Exception { final String template = getTaskAsString(COMPACTION_TASK); - final String taskSpec = + String taskSpec = StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity)); + taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for compaction task %s", taskID); indexer.waitUntilTaskCompletes(taskID); RetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE), + () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Compaction" ); } @@ -112,7 +164,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest { RetryUtil.retryUntilTrue( () -> { - int metadataSegmentCount = coordinator.getMetadataSegments(INDEX_DATASOURCE).size(); + int metadataSegmentCount = coordinator.getMetadataSegments(fullDatasourceName).size(); LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); return metadataSegmentCount == numExpectedSegments; }, @@ -124,7 +176,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest { RetryUtil.retryUntilTrue( () -> { - final List intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); + final List intervalsAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsAfterCompaction.sort(null); System.out.println("AFTER: " + intervalsAfterCompaction); System.out.println("EXPECTED: " + expectedIntervals); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 63681e207b6..8412e49c17f 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -31,6 +31,7 @@ public class ITIndexerTest extends AbstractITBatchIndexTest private static String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static String INDEX_DATASOURCE = "wikipedia_index_test"; + private static String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json"; private static String REINDEX_DATASOURCE = "wikipedia_reindex_test"; @@ -38,8 +39,8 @@ public class ITIndexerTest extends AbstractITBatchIndexTest public void testIndexData() throws Exception { try ( - final Closeable indexCloseable = unloader(INDEX_DATASOURCE); - final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE) + final Closeable indexCloseable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); ) { doIndexTestTest( INDEX_DATASOURCE, @@ -47,6 +48,7 @@ public class ITIndexerTest extends AbstractITBatchIndexTest INDEX_QUERIES_RESOURCE ); doReindexTest( + INDEX_DATASOURCE, REINDEX_DATASOURCE, REINDEX_TASK, INDEX_QUERIES_RESOURCE diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java index 3306da96b33..247fd7e55b2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java @@ -44,6 +44,7 @@ import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -64,6 +65,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json"; private static final String DATASOURCE = "kafka_indexing_service_test"; private static final String TOPIC_NAME = "kafka_indexing_service_topic"; + private static final int NUM_EVENTS_TO_SEND = 60; private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L; public static final String testPropertyPrefix = "kafka.test.property."; @@ -105,6 +107,14 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest @Inject private IntegrationTestingConfig config; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix(); + } + @Test public void testKafka() { @@ -143,7 +153,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest addFilteredProperties(consumerProperties); spec = getTaskAsString(INDEXER_FILE); - spec = StringUtils.replace(spec, "%%DATASOURCE%%", DATASOURCE); + spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName); spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME); spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties)); LOG.info("supervisorSpec: [%s]\n", spec); @@ -228,7 +238,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest } String queryStr = query_response_template; - queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE); + queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)); queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)); @@ -271,7 +281,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest @Override public Boolean call() { - return coordinator.areSegmentsLoaded(DATASOURCE); + return coordinator.areSegmentsLoaded(fullDatasourceName); } }, true, @@ -306,7 +316,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest // remove segments if (segmentsExist) { - unloadAndKillData(DATASOURCE); + unloadAndKillData(fullDatasourceName); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java index 3388efe2867..af7e82902f5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java @@ -44,6 +44,7 @@ import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -106,6 +107,13 @@ public class ITKafkaTest extends AbstractIndexerTest @Inject private IntegrationTestingConfig config; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix(); + } @Test public void testKafka() { @@ -204,7 +212,7 @@ public class ITKafkaTest extends AbstractIndexerTest addFilteredProperties(consumerProperties); indexerSpec = getTaskAsString(INDEXER_FILE); - indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", DATASOURCE); + indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", fullDatasourceName); indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME); indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events)); String consumerPropertiesJson = jsonMapper.writeValueAsString(consumerProperties); @@ -233,7 +241,7 @@ public class ITKafkaTest extends AbstractIndexerTest @Override public Boolean call() { - return coordinator.areSegmentsLoaded(DATASOURCE); + return coordinator.areSegmentsLoaded(fullDatasourceName); } }, true, @@ -263,7 +271,7 @@ public class ITKafkaTest extends AbstractIndexerTest } String queryStr = queryResponseTemplate; - queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE); + queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); // time boundary queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)); @@ -296,7 +304,7 @@ public class ITKafkaTest extends AbstractIndexerTest // remove segments if (segmentsExist) { - unloadAndKillData(DATASOURCE); + unloadAndKillData(fullDatasourceName); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java index 90ba30419c2..350e2ab3441 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java @@ -21,6 +21,9 @@ package org.apache.druid.tests.indexer; import com.google.common.base.Throwables; import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.ClientInfoResourceTestClient; @@ -28,9 +31,13 @@ import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.RetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.io.IOException; +import java.io.InputStream; + @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITNestedQueryPushDownTest extends AbstractIndexerTest { @@ -51,12 +58,36 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest @Inject ClientInfoResourceTestClient clientInfoResourceTestClient; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = WIKITICKER_DATA_SOURCE + config.getExtraDatasourceNameSuffix(); + } + @Test public void testIndexData() { try { loadData(); - queryHelper.testQueriesFromFile(WIKITICKER_QUERIES_RESOURCE, 2); + + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(WIKITICKER_QUERIES_RESOURCE); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", WIKITICKER_QUERIES_RESOURCE); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); } catch (Exception e) { LOG.error(e, "Error while testing"); @@ -66,11 +97,13 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest private void loadData() throws Exception { - final String taskID = indexer.submitTask(getTaskAsString(WIKITICKER_INDEX_TASK)); + String taskSpec = getTaskAsString(WIKITICKER_INDEX_TASK); + taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); RetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(WIKITICKER_DATA_SOURCE), "Segment Load" + () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load" ); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java index b844acd3e68..80ca6e10487 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java @@ -35,7 +35,7 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest @Test public void testIndexData() throws Exception { - try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { + try (final Closeable closeable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())) { doIndexTestTest( INDEX_DATASOURCE, INDEX_TASK, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java index 38adae8063d..8b65c40db3d 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java @@ -20,9 +20,11 @@ package org.apache.druid.tests.indexer; import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; import org.apache.druid.curator.discovery.ServerDiscoveryFactory; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; @@ -39,10 +41,12 @@ import org.apache.druid.testing.utils.ServerDiscoveryUtil; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; import java.io.IOException; +import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -70,13 +74,21 @@ public class ITUnionQueryTest extends AbstractIndexerTest @Inject IntegrationTestingConfig config; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix(); + } + @Test public void testUnionQuery() throws IOException { final int numTasks = 3; final Closer closer = Closer.create(); for (int i = 0; i < numTasks; i++) { - closer.register(unloader(UNION_DATASOURCE + i)); + closer.register(unloader(fullDatasourceName + i)); } try { // Load 4 datasources with same dimensions @@ -89,7 +101,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest taskIDs.add( indexer.submitTask( withServiceName( - withDataSource(task, UNION_DATASOURCE + i), + withDataSource(task, fullDatasourceName + i), EVENT_RECEIVER_SERVICE_PREFIX + i ) ) @@ -103,9 +115,9 @@ public class ITUnionQueryTest extends AbstractIndexerTest RetryUtil.retryUntil( () -> { for (int i = 0; i < numTasks; i++) { - final int countRows = queryHelper.countRows(UNION_DATASOURCE + i, "2013-08-31/2013-09-01"); + final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01"); if (countRows < 5) { - LOG.warn("%d events have been ingested to %s so far", countRows, UNION_DATASOURCE + i); + LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i); return false; } } @@ -119,7 +131,23 @@ public class ITUnionQueryTest extends AbstractIndexerTest // should hit the queries on realtime task LOG.info("Running Union Queries.."); - this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2); + + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(UNION_QUERIES_RESOURCE); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", UNION_QUERIES_RESOURCE); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + this.queryHelper.testQueriesFromString(queryResponseTemplate, 2); // wait for the task to complete for (int i = 0; i < numTasks; i++) { @@ -134,7 +162,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest @Override public Boolean call() { - return coordinator.areSegmentsLoaded(UNION_DATASOURCE + taskNum); + return coordinator.areSegmentsLoaded(fullDatasourceName + taskNum); } }, true, @@ -144,7 +172,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest ); } // run queries on historical nodes - this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2); + this.queryHelper.testQueriesFromString(queryResponseTemplate, 2); } catch (Throwable e) { @@ -162,7 +190,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest private String withDataSource(String taskAsString, String dataSource) { - return StringUtils.replace(taskAsString, UNION_DATASOURCE, dataSource); + return StringUtils.replace(taskAsString, "%%DATASOURCE%%", dataSource); } private String withServiceName(String taskAsString, String serviceName) diff --git a/integration-tests/src/test/resources/indexer/union_queries.json b/integration-tests/src/test/resources/indexer/union_queries.json index fa63e8404d4..627af04edc9 100644 --- a/integration-tests/src/test/resources/indexer/union_queries.json +++ b/integration-tests/src/test/resources/indexer/union_queries.json @@ -6,8 +6,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -69,8 +69,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -149,8 +149,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -263,8 +263,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -344,8 +344,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -417,8 +417,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -508,8 +508,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "granularity": "all", @@ -548,8 +548,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] } }, diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json index 3fdad69ff5d..1f7cb4486c9 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json @@ -1,6 +1,6 @@ { "type" : "compact", - "dataSource" : "wikipedia_index_test", + "dataSource" : "%%DATASOURCE%%", "interval" : "2013-08-31/2013-09-02", "keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY} } \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json index 04565bd8386..9618ba9e9b6 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json @@ -3,7 +3,7 @@ "description": "timeseries, 1 agg, all", "query":{ "queryType" : "timeBoundary", - "dataSource": "wikipedia_index_test" + "dataSource": "%%DATASOURCE%%" }, "expectedResults":[ { @@ -20,7 +20,7 @@ "description":"having spec on post aggregation", "query":{ "queryType":"groupBy", - "dataSource":"wikipedia_index_test", + "dataSource":"%%DATASOURCE%%", "granularity":"day", "dimensions":[ "page" diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json index 819caae752a..23532e55942 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json @@ -2,7 +2,7 @@ "type": "index", "spec": { "dataSchema": { - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json index 76ecb5cd1a3..9618ba9e9b6 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json @@ -3,7 +3,7 @@ "description": "timeseries, 1 agg, all", "query":{ "queryType" : "timeBoundary", - "dataSource": "wikipedia_parallel_index_test" + "dataSource": "%%DATASOURCE%%" }, "expectedResults":[ { @@ -20,7 +20,7 @@ "description":"having spec on post aggregation", "query":{ "queryType":"groupBy", - "dataSource":"wikipedia_parallel_index_test", + "dataSource":"%%DATASOURCE%%", "granularity":"day", "dimensions":[ "page" diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json index 911adbd9f27..f317c538f6b 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json @@ -2,7 +2,7 @@ "type": "index_parallel", "spec": { "dataSchema": { - "dataSource": "wikipedia_parallel_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json index acd88ca893e..46d5ec4395a 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json @@ -3,7 +3,7 @@ "description": "timeBoundary", "query": { "queryType":"timeBoundary", - "dataSource":"wikipedia_index_test" + "dataSource":"%%DATASOURCE%%" }, "expectedResults":[ { @@ -19,7 +19,7 @@ "description": "timeseries", "query": { "queryType": "timeseries", - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ], "granularity": "all", "aggregations": [ @@ -41,7 +41,7 @@ "description":"having spec on post aggregation", "query":{ "queryType":"groupBy", - "dataSource":"wikipedia_index_test", + "dataSource":"%%DATASOURCE%%", "granularity":"minute", "dimensions":[ "page" diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json index 765914b62ad..9e773609cb7 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json @@ -2,7 +2,7 @@ "type": "index_realtime_appenderator", "spec": { "dataSchema": { - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json index b579b347c34..bb6759595e0 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json @@ -3,7 +3,7 @@ "description": "timeBoundary", "query": { "queryType":"timeBoundary", - "dataSource":"wikipedia_index_test" + "dataSource":"%%DATASOURCE%%" }, "expectedResults":[ { @@ -19,7 +19,7 @@ "description": "timeseries", "query": { "queryType": "timeseries", - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ], "granularity": "all", "aggregations": [ @@ -41,7 +41,7 @@ "description":"having spec on post aggregation", "query":{ "queryType":"groupBy", - "dataSource":"wikipedia_index_test", + "dataSource":"%%DATASOURCE%%", "granularity":"minute", "dimensions":[ "page" diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json index ecfff579716..5f48162c488 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json @@ -2,7 +2,7 @@ "type": "index_realtime", "spec": { "dataSchema": { - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json index b63f9f184cc..e277a9127f4 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json @@ -2,7 +2,7 @@ "type": "index", "spec": { "dataSchema": { - "dataSource": "wikipedia_reindex_test", + "dataSource": "%%REINDEX_DATASOURCE%%", "metricsSpec": [ { "type": "doubleSum", @@ -42,7 +42,7 @@ "type": "index", "firehose": { "type": "ingestSegment", - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "interval": "2013-08-31/2013-09-01" } }, diff --git a/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json index 09af36efe28..75c1281fcd2 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json @@ -2,7 +2,7 @@ "type": "index_realtime", "spec": { "dataSchema": { - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikiticker_index_task.json b/integration-tests/src/test/resources/indexer/wikiticker_index_task.json index 5af8ae8cc24..d450c7b9458 100644 --- a/integration-tests/src/test/resources/indexer/wikiticker_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikiticker_index_task.json @@ -2,7 +2,7 @@ "type": "index", "spec": { "dataSchema": { - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "granularitySpec": { "type": "uniform", "segmentGranularity": "day", @@ -18,20 +18,7 @@ "dimensionsSpec": { "dimensions": [ "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", "page", - "regionIsoCode", - "regionName", "user" ] }, @@ -60,11 +47,6 @@ "name": "delta", "type": "longSum", "fieldName": "delta" - }, - { - "name": "user_unique", - "type": "hyperUnique", - "fieldName": "user" } ] }, @@ -72,7 +54,7 @@ "type": "index", "firehose": { "type": "local", - "baseDir": "/examples/quickstart/tutorial", + "baseDir": "/shared/wikiticker-it", "filter": "wikiticker-2015-09-12-sampled.json.gz" } }, diff --git a/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json b/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json index 4c0350c9585..c7a062c02bb 100644 --- a/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json +++ b/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json @@ -7,7 +7,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], @@ -60,7 +60,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], @@ -113,7 +113,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], @@ -191,7 +191,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], @@ -253,7 +253,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index ef731b31869..631c808695a 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -54,7 +54,7 @@ public class CoordinatorClient HttpMethod.GET, StringUtils.format( "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s", - dataSource, + StringUtils.urlEncode(dataSource), descriptor.getInterval(), descriptor.getPartitionNumber(), descriptor.getVersion() diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index d31ef1ac0c6..702d78b0856 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -154,7 +154,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient final FullResponseHolder response = druidLeaderClient.go( druidLeaderClient.makeRequest( HttpMethod.POST, - StringUtils.format("/druid/indexer/v1/task/%s/shutdown", taskId) + StringUtils.format( + "/druid/indexer/v1/task/%s/shutdown", + StringUtils.urlEncode(taskId) + ) ) ); @@ -255,7 +258,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient { try { final FullResponseHolder responseHolder = druidLeaderClient.go( - druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s/status", taskId)) + druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format( + "/druid/indexer/v1/task/%s/status", + StringUtils.urlEncode(taskId) + )) ); return jsonMapper.readValue( @@ -303,7 +309,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient { final String endPoint = StringUtils.format( "/druid/indexer/v1/pendingSegments/%s?interval=%s", - dataSource, + StringUtils.urlEncode(dataSource), new Interval(DateTimes.MIN, end) ); try { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java index 693b302b233..9e64731ec9c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.realtime.firehose; import com.google.common.base.Optional; import com.google.inject.Inject; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import javax.ws.rs.Path; @@ -49,7 +50,7 @@ public class ChatHandlerResource { if (taskId != null) { List requestTaskId = headers.getRequestHeader(TASK_ID_HEADER); - if (requestTaskId != null && !requestTaskId.contains(taskId)) { + if (requestTaskId != null && !requestTaskId.contains(StringUtils.urlEncode(taskId))) { return null; } } diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java index 6302a39f07e..7dcd3b2c237 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java @@ -20,6 +20,7 @@ package org.apache.druid.server.initialization.jetty; import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.realtime.firehose.ChatHandlerResource; public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder @@ -29,7 +30,7 @@ public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder super(path, taskId == null ? ImmutableMap.of() - : ImmutableMap.of(ChatHandlerResource.TASK_ID_HEADER, taskId) + : ImmutableMap.of(ChatHandlerResource.TASK_ID_HEADER, StringUtils.urlEncode(taskId)) ); } }