mirror of https://github.com/apache/druid.git
Some fixes and tests for spaces/non-ASCII chars in datasource names (#6761)
* Fixes and tests for spaces/non-ASCII datasource names * Some unit test fixes * Fix ITRealtimeIndexTaskTest * Checkstyle * TeamCity * PR comments
This commit is contained in:
parent
f72f33f84a
commit
8537a771b0
|
@ -21,6 +21,6 @@ set -e
|
||||||
|
|
||||||
pushd $TRAVIS_BUILD_DIR/integration-tests
|
pushd $TRAVIS_BUILD_DIR/integration-tests
|
||||||
|
|
||||||
mvn verify -P integration-tests -Dit.test=ITUnionQueryTest,ITTwitterQueryTest,ITWikipediaQueryTest,ITBasicAuthConfigurationTest,ITTLSTest
|
mvn verify -P integration-tests -Dit.test=ITUnionQueryTest,ITNestedQueryPushDownTest,ITTwitterQueryTest,ITWikipediaQueryTest,ITBasicAuthConfigurationTest,ITTLSTest
|
||||||
|
|
||||||
popd
|
popd
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.net.URLDecoder;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
|
@ -153,10 +154,39 @@ public class StringUtils
|
||||||
return s.toUpperCase(Locale.ENGLISH);
|
return s.toUpperCase(Locale.ENGLISH);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encodes a String in application/x-www-form-urlencoded format, with one exception:
|
||||||
|
* "+" in the encoded form is replaced with "%20".
|
||||||
|
*
|
||||||
|
* application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well.
|
||||||
|
*
|
||||||
|
* @param s String to be encoded
|
||||||
|
* @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20".
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
public static String urlEncode(String s)
|
public static String urlEncode(String s)
|
||||||
{
|
{
|
||||||
|
if (s == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return URLEncoder.encode(s, "UTF-8");
|
return StringUtils.replace(URLEncoder.encode(s, "UTF-8"), "+", "%20");
|
||||||
|
}
|
||||||
|
catch (UnsupportedEncodingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public static String urlDecode(String s)
|
||||||
|
{
|
||||||
|
if (s == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return URLDecoder.decode(s, "UTF-8");
|
||||||
}
|
}
|
||||||
catch (UnsupportedEncodingException e) {
|
catch (UnsupportedEncodingException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
|
|
@ -148,4 +148,16 @@ public class StringUtilsTest
|
||||||
Assert.assertEquals("bb", StringUtils.replace("aaaa", "aa", "b"));
|
Assert.assertEquals("bb", StringUtils.replace("aaaa", "aa", "b"));
|
||||||
Assert.assertEquals("", StringUtils.replace("aaaa", "aa", ""));
|
Assert.assertEquals("", StringUtils.replace("aaaa", "aa", ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testURLEncodeSpace()
|
||||||
|
{
|
||||||
|
String s1 = StringUtils.urlEncode("aaa bbb");
|
||||||
|
Assert.assertEquals(s1, "aaa%20bbb");
|
||||||
|
Assert.assertEquals("aaa bbb", StringUtils.urlDecode(s1));
|
||||||
|
|
||||||
|
String s2 = StringUtils.urlEncode("fff+ggg");
|
||||||
|
Assert.assertEquals(s2, "fff%2Bggg");
|
||||||
|
Assert.assertEquals("fff+ggg", StringUtils.urlDecode(s2));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -291,7 +291,7 @@ public abstract class IndexTaskClient implements AutoCloseable
|
||||||
|
|
||||||
final Request request = new Request(method, serviceUrl);
|
final Request request = new Request(method, serviceUrl);
|
||||||
// used to validate that we are talking to the correct worker
|
// used to validate that we are talking to the correct worker
|
||||||
request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
|
request.addHeader(ChatHandlerResource.TASK_ID_HEADER, StringUtils.urlEncode(taskId));
|
||||||
if (content.length > 0) {
|
if (content.length > 0) {
|
||||||
request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content);
|
request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content);
|
||||||
}
|
}
|
||||||
|
@ -369,7 +369,9 @@ public abstract class IndexTaskClient implements AutoCloseable
|
||||||
|
|
||||||
final Duration delay;
|
final Duration delay;
|
||||||
if (response != null && response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
|
if (response != null && response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
|
||||||
String headerId = response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER);
|
String headerId = StringUtils.urlDecode(
|
||||||
|
response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER)
|
||||||
|
);
|
||||||
if (headerId != null && !headerId.equals(taskId)) {
|
if (headerId != null && !headerId.equals(taskId)) {
|
||||||
log.warn(
|
log.warn(
|
||||||
"Expected worker to have taskId [%s] but has taskId [%s], will retry in [%d]s",
|
"Expected worker to have taskId [%s] but has taskId [%s], will retry in [%d]s",
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class TaskResourceFilter extends AbstractResourceFilter
|
||||||
@Override
|
@Override
|
||||||
public ContainerRequest filter(ContainerRequest request)
|
public ContainerRequest filter(ContainerRequest request)
|
||||||
{
|
{
|
||||||
final String taskId = Preconditions.checkNotNull(
|
String taskId = Preconditions.checkNotNull(
|
||||||
request.getPathSegments()
|
request.getPathSegments()
|
||||||
.get(
|
.get(
|
||||||
Iterables.indexOf(
|
Iterables.indexOf(
|
||||||
|
@ -80,6 +80,7 @@ public class TaskResourceFilter extends AbstractResourceFilter
|
||||||
) + 1
|
) + 1
|
||||||
).getPath()
|
).getPath()
|
||||||
);
|
);
|
||||||
|
taskId = StringUtils.urlDecode(taskId);
|
||||||
|
|
||||||
Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);
|
Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);
|
||||||
if (!taskOptional.isPresent()) {
|
if (!taskOptional.isPresent()) {
|
||||||
|
|
|
@ -123,7 +123,7 @@ $(document).ready(function() {
|
||||||
'<span style="color:#FF6000">suspended</span>' :
|
'<span style="color:#FF6000">suspended</span>' :
|
||||||
'<span style="color:#08B157">running</span>';
|
'<span style="color:#08B157">running</span>';
|
||||||
data[i] = {
|
data[i] = {
|
||||||
"dataSource" : supervisorId,
|
"dataSource" : dataList[i].id,
|
||||||
"more" :
|
"more" :
|
||||||
'<a href="/druid/indexer/v1/supervisor/' + supervisorId + '">payload</a>' +
|
'<a href="/druid/indexer/v1/supervisor/' + supervisorId + '">payload</a>' +
|
||||||
'<a href="/druid/indexer/v1/supervisor/' + supervisorId + '/status">status</a>' +
|
'<a href="/druid/indexer/v1/supervisor/' + supervisorId + '/status">status</a>' +
|
||||||
|
|
|
@ -35,8 +35,8 @@ public class TaskRunnerUtilsTest
|
||||||
"/druid/worker/v1/task/%s/log",
|
"/druid/worker/v1/task/%s/log",
|
||||||
"foo bar&"
|
"foo bar&"
|
||||||
);
|
);
|
||||||
Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo+bar%26/log", url.toString());
|
Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo%20bar%26/log", url.toString());
|
||||||
Assert.assertEquals("1.2.3.4:8290", url.getAuthority());
|
Assert.assertEquals("1.2.3.4:8290", url.getAuthority());
|
||||||
Assert.assertEquals("/druid/worker/v1/task/foo+bar%26/log", url.getPath());
|
Assert.assertEquals("/druid/worker/v1/task/foo%20bar%26/log", url.getPath());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,10 +16,12 @@
|
||||||
# Base image is built from integration-tests/docker-base in the Druid repo
|
# Base image is built from integration-tests/docker-base in the Druid repo
|
||||||
FROM imply/druiditbase
|
FROM imply/druiditbase
|
||||||
|
|
||||||
|
RUN echo "[mysqld]\ncharacter-set-server=utf8\ncollation-server=utf8_bin\n" >> /etc/mysql/my.cnf
|
||||||
|
|
||||||
# Setup metadata store
|
# Setup metadata store
|
||||||
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
|
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
|
||||||
RUN find /var/lib/mysql -type f -exec touch {} \; && /etc/init.d/mysql start \
|
RUN find /var/lib/mysql -type f -exec touch {} \; && /etc/init.d/mysql start \
|
||||||
&& echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid DEFAULT CHARACTER SET utf8;" | mysql -u root \
|
&& echo "CREATE USER 'druid'@'%' IDENTIFIED BY 'diurd'; GRANT ALL ON druid.* TO 'druid'@'%'; CREATE database druid DEFAULT CHARACTER SET utf8mb4;" | mysql -u root \
|
||||||
&& /etc/init.d/mysql stop
|
&& /etc/init.d/mysql stop
|
||||||
|
|
||||||
# Add Druid jars
|
# Add Druid jars
|
||||||
|
|
|
@ -14,7 +14,7 @@ command=java
|
||||||
-Ddruid.worker.capacity=3
|
-Ddruid.worker.capacity=3
|
||||||
-Ddruid.indexer.logs.directory=/shared/tasklogs
|
-Ddruid.indexer.logs.directory=/shared/tasklogs
|
||||||
-Ddruid.storage.storageDirectory=/shared/storage
|
-Ddruid.storage.storageDirectory=/shared/storage
|
||||||
-Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"
|
-Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"
|
||||||
-Ddruid.indexer.fork.property.druid.processing.buffer.sizeBytes=25000000
|
-Ddruid.indexer.fork.property.druid.processing.buffer.sizeBytes=25000000
|
||||||
-Ddruid.indexer.fork.property.druid.processing.numThreads=1
|
-Ddruid.indexer.fork.property.druid.processing.numThreads=1
|
||||||
-Ddruid.indexer.fork.server.http.numThreads=100
|
-Ddruid.indexer.fork.server.http.numThreads=100
|
||||||
|
|
|
@ -231,6 +231,7 @@
|
||||||
-Dfile.encoding=UTF-8
|
-Dfile.encoding=UTF-8
|
||||||
-Ddruid.test.config.dockerIp=${env.DOCKER_IP}
|
-Ddruid.test.config.dockerIp=${env.DOCKER_IP}
|
||||||
-Ddruid.test.config.hadoopDir=${env.HADOOP_DIR}
|
-Ddruid.test.config.hadoopDir=${env.HADOOP_DIR}
|
||||||
|
-Ddruid.test.config.extraDatasourceNameSuffix=\ Россия\ 한국\ 中国!?
|
||||||
-Ddruid.zk.service.host=${env.DOCKER_IP}
|
-Ddruid.zk.service.host=${env.DOCKER_IP}
|
||||||
-Ddruid.client.https.trustStorePath=client_tls/truststore.jks
|
-Ddruid.client.https.trustStorePath=client_tls/truststore.jks
|
||||||
-Ddruid.client.https.trustStorePassword=druid123
|
-Ddruid.client.https.trustStorePassword=druid123
|
||||||
|
|
|
@ -53,40 +53,44 @@ cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml
|
||||||
# copy the integration test jar, it provides test-only extension implementations
|
# copy the integration test jar, it provides test-only extension implementations
|
||||||
cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
|
cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
|
||||||
|
|
||||||
|
# one of the integration tests needs the wikiticker sample data
|
||||||
|
mkdir -p $SHARED_DIR/wikiticker-it
|
||||||
|
cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
|
||||||
|
|
||||||
docker network create --subnet=172.172.172.0/24 druid-it-net
|
docker network create --subnet=172.172.172.0/24 druid-it-net
|
||||||
|
|
||||||
# Build Druid Cluster Image
|
# Build Druid Cluster Image
|
||||||
docker build -t druid/cluster $SHARED_DIR/docker
|
docker build -t druid/cluster $SHARED_DIR/docker
|
||||||
|
|
||||||
# Start zookeeper and kafka
|
# Start zookeeper and kafka
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.2 --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.2 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster
|
||||||
|
|
||||||
# Start MYSQL
|
# Start MYSQL
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.3 --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.3 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster
|
||||||
|
|
||||||
# Start Overlord
|
# Start Overlord
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.4 --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.4 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
||||||
|
|
||||||
# Start Coordinator
|
# Start Coordinator
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.5 --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.5 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
||||||
|
|
||||||
# Start Historical
|
# Start Historical
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.6 --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.6 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
||||||
|
|
||||||
# Start Middlemanger
|
# Start Middlemanger
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.7 --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.7 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster
|
||||||
|
|
||||||
# Start Broker
|
# Start Broker
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.8 --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.8 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
|
||||||
|
|
||||||
# Start Router
|
# Start Router
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.9 --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.9 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
||||||
|
|
||||||
# Start Router with permissive TLS settings (client auth enabled, no hostname verification, no revocation check)
|
# Start Router with permissive TLS settings (client auth enabled, no hostname verification, no revocation check)
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.10 --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-permissive-tls.conf:$SUPERVISORDIR/router-permissive-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.10 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-permissive-tls.conf:$SUPERVISORDIR/router-permissive-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
||||||
|
|
||||||
# Start Router with TLS but no client auth
|
# Start Router with TLS but no client auth
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.11 --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-no-client-auth-tls.conf:$SUPERVISORDIR/router-no-client-auth-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.11 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-no-client-auth-tls.conf:$SUPERVISORDIR/router-no-client-auth-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
||||||
|
|
||||||
# Start Router with custom TLS cert checkers
|
# Start Router with custom TLS cert checkers
|
||||||
docker run -d --privileged --net druid-it-net --ip 172.172.172.12 --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-custom-check-tls.conf:$SUPERVISORDIR/router-custom-check-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
docker run -d --privileged --net druid-it-net --ip 172.172.172.12 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-custom-check-tls.conf:$SUPERVISORDIR/router-custom-check-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
||||||
|
|
|
@ -348,6 +348,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
|
||||||
{
|
{
|
||||||
return Boolean.valueOf(props.getOrDefault("manageKafkaTopic", "true"));
|
return Boolean.valueOf(props.getOrDefault("manageKafkaTopic", "true"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getExtraDatasourceNameSuffix()
|
||||||
|
{
|
||||||
|
return "";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,9 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
|
||||||
@NotNull
|
@NotNull
|
||||||
private String hadoopDir;
|
private String hadoopDir;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private String extraDatasourceNameSuffix = "";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntegrationTestingConfig get()
|
public IntegrationTestingConfig get()
|
||||||
{
|
{
|
||||||
|
@ -202,6 +205,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getExtraDatasourceNameSuffix()
|
||||||
|
{
|
||||||
|
return extraDatasourceNameSuffix;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,4 +82,6 @@ public interface IntegrationTestingConfig
|
||||||
Map<String, String> getProperties();
|
Map<String, String> getProperties();
|
||||||
|
|
||||||
boolean manageKafkaTopic();
|
boolean manageKafkaTopic();
|
||||||
|
|
||||||
|
String getExtraDatasourceNameSuffix();
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,7 +72,12 @@ public class ClientInfoResourceTestClient
|
||||||
StatusResponseHolder response = httpClient.go(
|
StatusResponseHolder response = httpClient.go(
|
||||||
new Request(
|
new Request(
|
||||||
HttpMethod.GET,
|
HttpMethod.GET,
|
||||||
new URL(StringUtils.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval))
|
new URL(StringUtils.format(
|
||||||
|
"%s/%s/dimensions?interval=%s",
|
||||||
|
getBrokerURL(),
|
||||||
|
StringUtils.urlEncode(dataSource),
|
||||||
|
interval
|
||||||
|
))
|
||||||
),
|
),
|
||||||
responseHandler
|
responseHandler
|
||||||
).get();
|
).get();
|
||||||
|
|
|
@ -72,12 +72,12 @@ public class CoordinatorResourceTestClient
|
||||||
|
|
||||||
private String getMetadataSegmentsURL(String dataSource)
|
private String getMetadataSegmentsURL(String dataSource)
|
||||||
{
|
{
|
||||||
return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), dataSource);
|
return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getIntervalsURL(String dataSource)
|
private String getIntervalsURL(String dataSource)
|
||||||
{
|
{
|
||||||
return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), dataSource);
|
return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getLoadStatusURL()
|
private String getLoadStatusURL()
|
||||||
|
@ -150,7 +150,7 @@ public class CoordinatorResourceTestClient
|
||||||
public void unloadSegmentsForDataSource(String dataSource)
|
public void unloadSegmentsForDataSource(String dataSource)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
makeRequest(HttpMethod.DELETE, StringUtils.format("%sdatasources/%s", getCoordinatorURL(), dataSource));
|
makeRequest(HttpMethod.DELETE, StringUtils.format("%sdatasources/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource)));
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
|
@ -165,7 +165,7 @@ public class CoordinatorResourceTestClient
|
||||||
StringUtils.format(
|
StringUtils.format(
|
||||||
"%sdatasources/%s/intervals/%s",
|
"%sdatasources/%s/intervals/%s",
|
||||||
getCoordinatorURL(),
|
getCoordinatorURL(),
|
||||||
dataSource,
|
StringUtils.urlEncode(dataSource),
|
||||||
interval.toString().replace('/', '_')
|
interval.toString().replace('/', '_')
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
|
@ -42,7 +42,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||||
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLEncoder;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -121,7 +120,7 @@ public class OverlordResourceTestClient
|
||||||
StringUtils.format(
|
StringUtils.format(
|
||||||
"%stask/%s/status",
|
"%stask/%s/status",
|
||||||
getIndexerURL(),
|
getIndexerURL(),
|
||||||
URLEncoder.encode(taskID, "UTF-8")
|
StringUtils.urlEncode(taskID)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -234,7 +233,7 @@ public class OverlordResourceTestClient
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
StatusResponseHolder response = httpClient.go(
|
StatusResponseHolder response = httpClient.go(
|
||||||
new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), id))),
|
new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))),
|
||||||
responseHandler
|
responseHandler
|
||||||
).get();
|
).get();
|
||||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||||
|
|
|
@ -45,6 +45,7 @@ public class TestQueryHelper
|
||||||
private final String brokerTLS;
|
private final String brokerTLS;
|
||||||
private final String router;
|
private final String router;
|
||||||
private final String routerTLS;
|
private final String routerTLS;
|
||||||
|
private final IntegrationTestingConfig config;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
TestQueryHelper(
|
TestQueryHelper(
|
||||||
|
@ -59,6 +60,7 @@ public class TestQueryHelper
|
||||||
this.brokerTLS = config.getBrokerTLSUrl();
|
this.brokerTLS = config.getBrokerTLSUrl();
|
||||||
this.router = config.getRouterUrl();
|
this.router = config.getRouterUrl();
|
||||||
this.routerTLS = config.getRouterTLSUrl();
|
this.routerTLS = config.getRouterTLSUrl();
|
||||||
|
this.config = config;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
|
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
package org.apache.druid.tests.indexer;
|
package org.apache.druid.tests.indexer;
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||||
import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
|
import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
|
||||||
|
@ -27,6 +30,7 @@ import org.apache.druid.testing.utils.RetryUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
||||||
|
@ -45,9 +49,31 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
||||||
String queryFilePath
|
String queryFilePath
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
submitTaskAndWait(indexTaskFilePath, dataSource);
|
final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
|
||||||
|
final String taskSpec = StringUtils.replace(
|
||||||
|
getTaskAsString(indexTaskFilePath),
|
||||||
|
"%%DATASOURCE%%",
|
||||||
|
fullDatasourceName
|
||||||
|
);
|
||||||
|
|
||||||
|
submitTaskAndWait(taskSpec, fullDatasourceName);
|
||||||
try {
|
try {
|
||||||
queryHelper.testQueriesFromFile(queryFilePath, 2);
|
|
||||||
|
String queryResponseTemplate;
|
||||||
|
try {
|
||||||
|
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath);
|
||||||
|
queryResponseTemplate = IOUtils.toString(is, "UTF-8");
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new ISE(e, "could not read query file: %s", queryFilePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResponseTemplate = StringUtils.replace(
|
||||||
|
queryResponseTemplate,
|
||||||
|
"%%DATASOURCE%%",
|
||||||
|
fullDatasourceName
|
||||||
|
);
|
||||||
|
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -57,17 +83,48 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
||||||
}
|
}
|
||||||
|
|
||||||
void doReindexTest(
|
void doReindexTest(
|
||||||
|
String baseDataSource,
|
||||||
String reindexDataSource,
|
String reindexDataSource,
|
||||||
String reindexTaskFilePath,
|
String reindexTaskFilePath,
|
||||||
String queryFilePath
|
String queryFilePath
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
submitTaskAndWait(reindexTaskFilePath, reindexDataSource);
|
final String fullBaseDatasourceName = baseDataSource + config.getExtraDatasourceNameSuffix();
|
||||||
|
final String fullReindexDatasourceName = reindexDataSource + config.getExtraDatasourceNameSuffix();
|
||||||
|
|
||||||
|
String taskSpec = StringUtils.replace(
|
||||||
|
getTaskAsString(reindexTaskFilePath),
|
||||||
|
"%%DATASOURCE%%",
|
||||||
|
fullBaseDatasourceName
|
||||||
|
);
|
||||||
|
|
||||||
|
taskSpec = StringUtils.replace(
|
||||||
|
taskSpec,
|
||||||
|
"%%REINDEX_DATASOURCE%%",
|
||||||
|
fullReindexDatasourceName
|
||||||
|
);
|
||||||
|
|
||||||
|
submitTaskAndWait(taskSpec, fullReindexDatasourceName);
|
||||||
try {
|
try {
|
||||||
queryHelper.testQueriesFromFile(queryFilePath, 2);
|
String queryResponseTemplate;
|
||||||
|
try {
|
||||||
|
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath);
|
||||||
|
queryResponseTemplate = IOUtils.toString(is, "UTF-8");
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new ISE(e, "could not read query file: %s", queryFilePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResponseTemplate = StringUtils.replace(
|
||||||
|
queryResponseTemplate,
|
||||||
|
"%%DATASOURCE%%",
|
||||||
|
fullBaseDatasourceName
|
||||||
|
);
|
||||||
|
|
||||||
|
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
// verify excluded dimension is not reIndexed
|
// verify excluded dimension is not reIndexed
|
||||||
final List<String> dimensions = clientInfoResourceTestClient.getDimensions(
|
final List<String> dimensions = clientInfoResourceTestClient.getDimensions(
|
||||||
reindexDataSource,
|
fullReindexDatasourceName,
|
||||||
"2013-08-31T00:00:00.000Z/2013-09-10T00:00:00.000Z"
|
"2013-08-31T00:00:00.000Z/2013-09-10T00:00:00.000Z"
|
||||||
);
|
);
|
||||||
Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot"));
|
Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot"));
|
||||||
|
@ -78,9 +135,9 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void submitTaskAndWait(String indexTaskFilePath, String dataSourceName) throws IOException
|
private void submitTaskAndWait(String taskSpec, String dataSourceName)
|
||||||
{
|
{
|
||||||
final String taskID = indexer.submitTask(getTaskAsString(indexTaskFilePath));
|
final String taskID = indexer.submitTask(taskSpec);
|
||||||
LOG.info("TaskID for loading index task %s", taskID);
|
LOG.info("TaskID for loading index task %s", taskID);
|
||||||
indexer.waitUntilTaskCompletes(taskID);
|
indexer.waitUntilTaskCompletes(taskID);
|
||||||
|
|
||||||
|
|
|
@ -79,15 +79,21 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
|
||||||
@Inject
|
@Inject
|
||||||
IntegrationTestingConfig config;
|
IntegrationTestingConfig config;
|
||||||
|
|
||||||
|
private String fullDatasourceName;
|
||||||
|
|
||||||
void doTest()
|
void doTest()
|
||||||
{
|
{
|
||||||
|
fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix();
|
||||||
|
|
||||||
LOG.info("Starting test: ITRealtimeIndexTaskTest");
|
LOG.info("Starting test: ITRealtimeIndexTaskTest");
|
||||||
try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
|
try (final Closeable closeable = unloader(fullDatasourceName)) {
|
||||||
// the task will run for 3 minutes and then shutdown itself
|
// the task will run for 3 minutes and then shutdown itself
|
||||||
String task = setShutOffTime(
|
String task = setShutOffTime(
|
||||||
getTaskAsString(getTaskResource()),
|
getTaskAsString(getTaskResource()),
|
||||||
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
|
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
|
||||||
);
|
);
|
||||||
|
task = StringUtils.replace(task, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
|
|
||||||
LOG.info("indexerSpec: [%s]\n", task);
|
LOG.info("indexerSpec: [%s]\n", task);
|
||||||
taskID = indexer.submitTask(task);
|
taskID = indexer.submitTask(task);
|
||||||
|
|
||||||
|
@ -119,6 +125,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
|
||||||
queryStr = StringUtils.replace(queryStr, "%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)));
|
queryStr = StringUtils.replace(queryStr, "%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)));
|
||||||
String postAgResponseTimestamp = TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0));
|
String postAgResponseTimestamp = TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0));
|
||||||
queryStr = StringUtils.replace(queryStr, "%%POST_AG_RESPONSE_TIMESTAMP%%", postAgResponseTimestamp);
|
queryStr = StringUtils.replace(queryStr, "%%POST_AG_RESPONSE_TIMESTAMP%%", postAgResponseTimestamp);
|
||||||
|
queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
|
|
||||||
// should hit the queries all on realtime task or some on realtime task
|
// should hit the queries all on realtime task or some on realtime task
|
||||||
// and some on historical. Which it is depends on where in the minute we were
|
// and some on historical. Which it is depends on where in the minute we were
|
||||||
|
@ -140,7 +147,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
|
||||||
@Override
|
@Override
|
||||||
public Boolean call()
|
public Boolean call()
|
||||||
{
|
{
|
||||||
return coordinator.areSegmentsLoaded(INDEX_DATASOURCE);
|
return coordinator.areSegmentsLoaded(fullDatasourceName);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
true,
|
true,
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.druid.guice.annotations.Json;
|
import org.apache.druid.guice.annotations.Json;
|
||||||
import org.apache.druid.guice.annotations.Smile;
|
import org.apache.druid.guice.annotations.Smile;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||||
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||||
import org.apache.druid.testing.clients.OverlordResourceTestClient;
|
import org.apache.druid.testing.clients.OverlordResourceTestClient;
|
||||||
import org.apache.druid.testing.utils.RetryUtil;
|
import org.apache.druid.testing.utils.RetryUtil;
|
||||||
|
@ -54,6 +55,9 @@ public abstract class AbstractIndexerTest
|
||||||
@Inject
|
@Inject
|
||||||
protected TestQueryHelper queryHelper;
|
protected TestQueryHelper queryHelper;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private IntegrationTestingConfig config;
|
||||||
|
|
||||||
protected Closeable unloader(final String dataSource)
|
protected Closeable unloader(final String dataSource)
|
||||||
{
|
{
|
||||||
return () -> unloadAndKillData(dataSource);
|
return () -> unloadAndKillData(dataSource);
|
||||||
|
|
|
@ -19,15 +19,21 @@
|
||||||
|
|
||||||
package org.apache.druid.tests.indexer;
|
package org.apache.druid.tests.indexer;
|
||||||
|
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||||
import org.apache.druid.testing.utils.RetryUtil;
|
import org.apache.druid.testing.utils.RetryUtil;
|
||||||
|
import org.testng.annotations.BeforeSuite;
|
||||||
import org.testng.annotations.Guice;
|
import org.testng.annotations.Guice;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||||
|
@ -39,23 +45,49 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
||||||
private static String INDEX_DATASOURCE = "wikipedia_index_test";
|
private static String INDEX_DATASOURCE = "wikipedia_index_test";
|
||||||
private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
|
private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private IntegrationTestingConfig config;
|
||||||
|
|
||||||
|
private String fullDatasourceName;
|
||||||
|
|
||||||
|
@BeforeSuite
|
||||||
|
public void setFullDatasourceName()
|
||||||
|
{
|
||||||
|
fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCompactionWithoutKeepSegmentGranularity() throws Exception
|
public void testCompactionWithoutKeepSegmentGranularity() throws Exception
|
||||||
{
|
{
|
||||||
loadData();
|
loadData();
|
||||||
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
|
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
|
||||||
intervalsBeforeCompaction.sort(null);
|
intervalsBeforeCompaction.sort(null);
|
||||||
final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
|
final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
|
||||||
if (intervalsBeforeCompaction.contains(compactedInterval)) {
|
if (intervalsBeforeCompaction.contains(compactedInterval)) {
|
||||||
throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval);
|
throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval);
|
||||||
}
|
}
|
||||||
try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
|
try (final Closeable closeable = unloader(fullDatasourceName)) {
|
||||||
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
|
String queryResponseTemplate;
|
||||||
|
try {
|
||||||
|
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE);
|
||||||
|
queryResponseTemplate = IOUtils.toString(is, "UTF-8");
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResponseTemplate = StringUtils.replace(
|
||||||
|
queryResponseTemplate,
|
||||||
|
"%%DATASOURCE%%",
|
||||||
|
fullDatasourceName
|
||||||
|
);
|
||||||
|
|
||||||
|
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
compactData(false);
|
compactData(false);
|
||||||
|
|
||||||
// 4 segments across 2 days, compacted into 1 new segment (5 total)
|
// 4 segments across 2 days, compacted into 1 new segment (5 total)
|
||||||
checkCompactionFinished(5);
|
checkCompactionFinished(5);
|
||||||
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
|
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
|
|
||||||
intervalsBeforeCompaction.add(compactedInterval);
|
intervalsBeforeCompaction.add(compactedInterval);
|
||||||
intervalsBeforeCompaction.sort(null);
|
intervalsBeforeCompaction.sort(null);
|
||||||
|
@ -67,15 +99,31 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
||||||
public void testCompactionWithKeepSegmentGranularity() throws Exception
|
public void testCompactionWithKeepSegmentGranularity() throws Exception
|
||||||
{
|
{
|
||||||
loadData();
|
loadData();
|
||||||
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
|
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
|
||||||
intervalsBeforeCompaction.sort(null);
|
intervalsBeforeCompaction.sort(null);
|
||||||
try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
|
try (final Closeable closeable = unloader(fullDatasourceName)) {
|
||||||
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
|
String queryResponseTemplate;
|
||||||
|
try {
|
||||||
|
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE);
|
||||||
|
queryResponseTemplate = IOUtils.toString(is, "UTF-8");
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResponseTemplate = StringUtils.replace(
|
||||||
|
queryResponseTemplate,
|
||||||
|
"%%DATASOURCE%%",
|
||||||
|
fullDatasourceName
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
compactData(true);
|
compactData(true);
|
||||||
|
|
||||||
// 4 segments across 2 days, compacted into 2 new segments (6 total)
|
// 4 segments across 2 days, compacted into 2 new segments (6 total)
|
||||||
checkCompactionFinished(6);
|
checkCompactionFinished(6);
|
||||||
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
|
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
|
|
||||||
checkCompactionIntervals(intervalsBeforeCompaction);
|
checkCompactionIntervals(intervalsBeforeCompaction);
|
||||||
}
|
}
|
||||||
|
@ -83,12 +131,14 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
||||||
|
|
||||||
private void loadData() throws Exception
|
private void loadData() throws Exception
|
||||||
{
|
{
|
||||||
final String taskID = indexer.submitTask(getTaskAsString(INDEX_TASK));
|
String taskSpec = getTaskAsString(INDEX_TASK);
|
||||||
|
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
|
final String taskID = indexer.submitTask(taskSpec);
|
||||||
LOG.info("TaskID for loading index task %s", taskID);
|
LOG.info("TaskID for loading index task %s", taskID);
|
||||||
indexer.waitUntilTaskCompletes(taskID);
|
indexer.waitUntilTaskCompletes(taskID);
|
||||||
|
|
||||||
RetryUtil.retryUntilTrue(
|
RetryUtil.retryUntilTrue(
|
||||||
() -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE),
|
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
|
||||||
"Segment Load"
|
"Segment Load"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -96,14 +146,16 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
||||||
private void compactData(boolean keepSegmentGranularity) throws Exception
|
private void compactData(boolean keepSegmentGranularity) throws Exception
|
||||||
{
|
{
|
||||||
final String template = getTaskAsString(COMPACTION_TASK);
|
final String template = getTaskAsString(COMPACTION_TASK);
|
||||||
final String taskSpec =
|
String taskSpec =
|
||||||
StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
|
StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
|
||||||
|
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
|
|
||||||
final String taskID = indexer.submitTask(taskSpec);
|
final String taskID = indexer.submitTask(taskSpec);
|
||||||
LOG.info("TaskID for compaction task %s", taskID);
|
LOG.info("TaskID for compaction task %s", taskID);
|
||||||
indexer.waitUntilTaskCompletes(taskID);
|
indexer.waitUntilTaskCompletes(taskID);
|
||||||
|
|
||||||
RetryUtil.retryUntilTrue(
|
RetryUtil.retryUntilTrue(
|
||||||
() -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE),
|
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
|
||||||
"Segment Compaction"
|
"Segment Compaction"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -112,7 +164,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
||||||
{
|
{
|
||||||
RetryUtil.retryUntilTrue(
|
RetryUtil.retryUntilTrue(
|
||||||
() -> {
|
() -> {
|
||||||
int metadataSegmentCount = coordinator.getMetadataSegments(INDEX_DATASOURCE).size();
|
int metadataSegmentCount = coordinator.getMetadataSegments(fullDatasourceName).size();
|
||||||
LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments);
|
LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments);
|
||||||
return metadataSegmentCount == numExpectedSegments;
|
return metadataSegmentCount == numExpectedSegments;
|
||||||
},
|
},
|
||||||
|
@ -124,7 +176,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
|
||||||
{
|
{
|
||||||
RetryUtil.retryUntilTrue(
|
RetryUtil.retryUntilTrue(
|
||||||
() -> {
|
() -> {
|
||||||
final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
|
final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
|
||||||
intervalsAfterCompaction.sort(null);
|
intervalsAfterCompaction.sort(null);
|
||||||
System.out.println("AFTER: " + intervalsAfterCompaction);
|
System.out.println("AFTER: " + intervalsAfterCompaction);
|
||||||
System.out.println("EXPECTED: " + expectedIntervals);
|
System.out.println("EXPECTED: " + expectedIntervals);
|
||||||
|
|
|
@ -31,6 +31,7 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
|
||||||
private static String INDEX_TASK = "/indexer/wikipedia_index_task.json";
|
private static String INDEX_TASK = "/indexer/wikipedia_index_task.json";
|
||||||
private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
|
private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
|
||||||
private static String INDEX_DATASOURCE = "wikipedia_index_test";
|
private static String INDEX_DATASOURCE = "wikipedia_index_test";
|
||||||
|
|
||||||
private static String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
|
private static String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
|
||||||
private static String REINDEX_DATASOURCE = "wikipedia_reindex_test";
|
private static String REINDEX_DATASOURCE = "wikipedia_reindex_test";
|
||||||
|
|
||||||
|
@ -38,8 +39,8 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
|
||||||
public void testIndexData() throws Exception
|
public void testIndexData() throws Exception
|
||||||
{
|
{
|
||||||
try (
|
try (
|
||||||
final Closeable indexCloseable = unloader(INDEX_DATASOURCE);
|
final Closeable indexCloseable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
|
||||||
final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE)
|
final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
|
||||||
) {
|
) {
|
||||||
doIndexTestTest(
|
doIndexTestTest(
|
||||||
INDEX_DATASOURCE,
|
INDEX_DATASOURCE,
|
||||||
|
@ -47,6 +48,7 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
|
||||||
INDEX_QUERIES_RESOURCE
|
INDEX_QUERIES_RESOURCE
|
||||||
);
|
);
|
||||||
doReindexTest(
|
doReindexTest(
|
||||||
|
INDEX_DATASOURCE,
|
||||||
REINDEX_DATASOURCE,
|
REINDEX_DATASOURCE,
|
||||||
REINDEX_TASK,
|
REINDEX_TASK,
|
||||||
INDEX_QUERIES_RESOURCE
|
INDEX_QUERIES_RESOURCE
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.joda.time.DateTimeZone;
|
||||||
import org.joda.time.format.DateTimeFormat;
|
import org.joda.time.format.DateTimeFormat;
|
||||||
import org.joda.time.format.DateTimeFormatter;
|
import org.joda.time.format.DateTimeFormatter;
|
||||||
import org.testng.annotations.AfterClass;
|
import org.testng.annotations.AfterClass;
|
||||||
|
import org.testng.annotations.BeforeSuite;
|
||||||
import org.testng.annotations.Guice;
|
import org.testng.annotations.Guice;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
@ -64,6 +65,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||||
private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
|
private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
|
||||||
private static final String DATASOURCE = "kafka_indexing_service_test";
|
private static final String DATASOURCE = "kafka_indexing_service_test";
|
||||||
private static final String TOPIC_NAME = "kafka_indexing_service_topic";
|
private static final String TOPIC_NAME = "kafka_indexing_service_topic";
|
||||||
|
|
||||||
private static final int NUM_EVENTS_TO_SEND = 60;
|
private static final int NUM_EVENTS_TO_SEND = 60;
|
||||||
private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
|
private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
|
||||||
public static final String testPropertyPrefix = "kafka.test.property.";
|
public static final String testPropertyPrefix = "kafka.test.property.";
|
||||||
|
@ -105,6 +107,14 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||||
@Inject
|
@Inject
|
||||||
private IntegrationTestingConfig config;
|
private IntegrationTestingConfig config;
|
||||||
|
|
||||||
|
private String fullDatasourceName;
|
||||||
|
|
||||||
|
@BeforeSuite
|
||||||
|
public void setFullDatasourceName()
|
||||||
|
{
|
||||||
|
fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testKafka()
|
public void testKafka()
|
||||||
{
|
{
|
||||||
|
@ -143,7 +153,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||||
addFilteredProperties(consumerProperties);
|
addFilteredProperties(consumerProperties);
|
||||||
|
|
||||||
spec = getTaskAsString(INDEXER_FILE);
|
spec = getTaskAsString(INDEXER_FILE);
|
||||||
spec = StringUtils.replace(spec, "%%DATASOURCE%%", DATASOURCE);
|
spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
|
spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
|
||||||
spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
|
spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
|
||||||
LOG.info("supervisorSpec: [%s]\n", spec);
|
LOG.info("supervisorSpec: [%s]\n", spec);
|
||||||
|
@ -228,7 +238,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||||
}
|
}
|
||||||
|
|
||||||
String queryStr = query_response_template;
|
String queryStr = query_response_template;
|
||||||
queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE);
|
queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
|
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
|
||||||
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
|
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
|
||||||
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst));
|
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst));
|
||||||
|
@ -271,7 +281,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||||
@Override
|
@Override
|
||||||
public Boolean call()
|
public Boolean call()
|
||||||
{
|
{
|
||||||
return coordinator.areSegmentsLoaded(DATASOURCE);
|
return coordinator.areSegmentsLoaded(fullDatasourceName);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
true,
|
true,
|
||||||
|
@ -306,7 +316,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||||
|
|
||||||
// remove segments
|
// remove segments
|
||||||
if (segmentsExist) {
|
if (segmentsExist) {
|
||||||
unloadAndKillData(DATASOURCE);
|
unloadAndKillData(fullDatasourceName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.joda.time.DateTimeZone;
|
||||||
import org.joda.time.format.DateTimeFormat;
|
import org.joda.time.format.DateTimeFormat;
|
||||||
import org.joda.time.format.DateTimeFormatter;
|
import org.joda.time.format.DateTimeFormatter;
|
||||||
import org.testng.annotations.AfterClass;
|
import org.testng.annotations.AfterClass;
|
||||||
|
import org.testng.annotations.BeforeSuite;
|
||||||
import org.testng.annotations.Guice;
|
import org.testng.annotations.Guice;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
@ -106,6 +107,13 @@ public class ITKafkaTest extends AbstractIndexerTest
|
||||||
@Inject
|
@Inject
|
||||||
private IntegrationTestingConfig config;
|
private IntegrationTestingConfig config;
|
||||||
|
|
||||||
|
private String fullDatasourceName;
|
||||||
|
|
||||||
|
@BeforeSuite
|
||||||
|
public void setFullDatasourceName()
|
||||||
|
{
|
||||||
|
fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
|
||||||
|
}
|
||||||
@Test
|
@Test
|
||||||
public void testKafka()
|
public void testKafka()
|
||||||
{
|
{
|
||||||
|
@ -204,7 +212,7 @@ public class ITKafkaTest extends AbstractIndexerTest
|
||||||
addFilteredProperties(consumerProperties);
|
addFilteredProperties(consumerProperties);
|
||||||
|
|
||||||
indexerSpec = getTaskAsString(INDEXER_FILE);
|
indexerSpec = getTaskAsString(INDEXER_FILE);
|
||||||
indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", DATASOURCE);
|
indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME);
|
indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME);
|
||||||
indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events));
|
indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events));
|
||||||
String consumerPropertiesJson = jsonMapper.writeValueAsString(consumerProperties);
|
String consumerPropertiesJson = jsonMapper.writeValueAsString(consumerProperties);
|
||||||
|
@ -233,7 +241,7 @@ public class ITKafkaTest extends AbstractIndexerTest
|
||||||
@Override
|
@Override
|
||||||
public Boolean call()
|
public Boolean call()
|
||||||
{
|
{
|
||||||
return coordinator.areSegmentsLoaded(DATASOURCE);
|
return coordinator.areSegmentsLoaded(fullDatasourceName);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
true,
|
true,
|
||||||
|
@ -263,7 +271,7 @@ public class ITKafkaTest extends AbstractIndexerTest
|
||||||
}
|
}
|
||||||
|
|
||||||
String queryStr = queryResponseTemplate;
|
String queryStr = queryResponseTemplate;
|
||||||
queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE);
|
queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
// time boundary
|
// time boundary
|
||||||
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
|
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
|
||||||
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
|
queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
|
||||||
|
@ -296,7 +304,7 @@ public class ITKafkaTest extends AbstractIndexerTest
|
||||||
|
|
||||||
// remove segments
|
// remove segments
|
||||||
if (segmentsExist) {
|
if (segmentsExist) {
|
||||||
unloadAndKillData(DATASOURCE);
|
unloadAndKillData(fullDatasourceName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,9 @@ package org.apache.druid.tests.indexer;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||||
import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
|
import org.apache.druid.testing.clients.ClientInfoResourceTestClient;
|
||||||
|
@ -28,9 +31,13 @@ import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||||
import org.apache.druid.testing.utils.RetryUtil;
|
import org.apache.druid.testing.utils.RetryUtil;
|
||||||
import org.apache.druid.testing.utils.TestQueryHelper;
|
import org.apache.druid.testing.utils.TestQueryHelper;
|
||||||
|
import org.testng.annotations.BeforeSuite;
|
||||||
import org.testng.annotations.Guice;
|
import org.testng.annotations.Guice;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||||
public class ITNestedQueryPushDownTest extends AbstractIndexerTest
|
public class ITNestedQueryPushDownTest extends AbstractIndexerTest
|
||||||
{
|
{
|
||||||
|
@ -51,12 +58,36 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest
|
||||||
@Inject
|
@Inject
|
||||||
ClientInfoResourceTestClient clientInfoResourceTestClient;
|
ClientInfoResourceTestClient clientInfoResourceTestClient;
|
||||||
|
|
||||||
|
private String fullDatasourceName;
|
||||||
|
|
||||||
|
@BeforeSuite
|
||||||
|
public void setFullDatasourceName()
|
||||||
|
{
|
||||||
|
fullDatasourceName = WIKITICKER_DATA_SOURCE + config.getExtraDatasourceNameSuffix();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIndexData()
|
public void testIndexData()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
loadData();
|
loadData();
|
||||||
queryHelper.testQueriesFromFile(WIKITICKER_QUERIES_RESOURCE, 2);
|
|
||||||
|
String queryResponseTemplate;
|
||||||
|
try {
|
||||||
|
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(WIKITICKER_QUERIES_RESOURCE);
|
||||||
|
queryResponseTemplate = IOUtils.toString(is, "UTF-8");
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new ISE(e, "could not read query file: %s", WIKITICKER_QUERIES_RESOURCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResponseTemplate = StringUtils.replace(
|
||||||
|
queryResponseTemplate,
|
||||||
|
"%%DATASOURCE%%",
|
||||||
|
fullDatasourceName
|
||||||
|
);
|
||||||
|
|
||||||
|
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
LOG.error(e, "Error while testing");
|
LOG.error(e, "Error while testing");
|
||||||
|
@ -66,11 +97,13 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest
|
||||||
|
|
||||||
private void loadData() throws Exception
|
private void loadData() throws Exception
|
||||||
{
|
{
|
||||||
final String taskID = indexer.submitTask(getTaskAsString(WIKITICKER_INDEX_TASK));
|
String taskSpec = getTaskAsString(WIKITICKER_INDEX_TASK);
|
||||||
|
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
|
||||||
|
final String taskID = indexer.submitTask(taskSpec);
|
||||||
LOG.info("TaskID for loading index task %s", taskID);
|
LOG.info("TaskID for loading index task %s", taskID);
|
||||||
indexer.waitUntilTaskCompletes(taskID);
|
indexer.waitUntilTaskCompletes(taskID);
|
||||||
RetryUtil.retryUntilTrue(
|
RetryUtil.retryUntilTrue(
|
||||||
() -> coordinator.areSegmentsLoaded(WIKITICKER_DATA_SOURCE), "Segment Load"
|
() -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest
|
||||||
@Test
|
@Test
|
||||||
public void testIndexData() throws Exception
|
public void testIndexData() throws Exception
|
||||||
{
|
{
|
||||||
try (final Closeable closeable = unloader(INDEX_DATASOURCE)) {
|
try (final Closeable closeable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())) {
|
||||||
doIndexTestTest(
|
doIndexTestTest(
|
||||||
INDEX_DATASOURCE,
|
INDEX_DATASOURCE,
|
||||||
INDEX_TASK,
|
INDEX_TASK,
|
||||||
|
|
|
@ -20,9 +20,11 @@
|
||||||
package org.apache.druid.tests.indexer;
|
package org.apache.druid.tests.indexer;
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
|
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
|
||||||
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
|
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.io.Closer;
|
import org.apache.druid.java.util.common.io.Closer;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
@ -39,10 +41,12 @@ import org.apache.druid.testing.utils.ServerDiscoveryUtil;
|
||||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
import org.testng.annotations.BeforeSuite;
|
||||||
import org.testng.annotations.Guice;
|
import org.testng.annotations.Guice;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -70,13 +74,21 @@ public class ITUnionQueryTest extends AbstractIndexerTest
|
||||||
@Inject
|
@Inject
|
||||||
IntegrationTestingConfig config;
|
IntegrationTestingConfig config;
|
||||||
|
|
||||||
|
private String fullDatasourceName;
|
||||||
|
|
||||||
|
@BeforeSuite
|
||||||
|
public void setFullDatasourceName()
|
||||||
|
{
|
||||||
|
fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnionQuery() throws IOException
|
public void testUnionQuery() throws IOException
|
||||||
{
|
{
|
||||||
final int numTasks = 3;
|
final int numTasks = 3;
|
||||||
final Closer closer = Closer.create();
|
final Closer closer = Closer.create();
|
||||||
for (int i = 0; i < numTasks; i++) {
|
for (int i = 0; i < numTasks; i++) {
|
||||||
closer.register(unloader(UNION_DATASOURCE + i));
|
closer.register(unloader(fullDatasourceName + i));
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// Load 4 datasources with same dimensions
|
// Load 4 datasources with same dimensions
|
||||||
|
@ -89,7 +101,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
|
||||||
taskIDs.add(
|
taskIDs.add(
|
||||||
indexer.submitTask(
|
indexer.submitTask(
|
||||||
withServiceName(
|
withServiceName(
|
||||||
withDataSource(task, UNION_DATASOURCE + i),
|
withDataSource(task, fullDatasourceName + i),
|
||||||
EVENT_RECEIVER_SERVICE_PREFIX + i
|
EVENT_RECEIVER_SERVICE_PREFIX + i
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -103,9 +115,9 @@ public class ITUnionQueryTest extends AbstractIndexerTest
|
||||||
RetryUtil.retryUntil(
|
RetryUtil.retryUntil(
|
||||||
() -> {
|
() -> {
|
||||||
for (int i = 0; i < numTasks; i++) {
|
for (int i = 0; i < numTasks; i++) {
|
||||||
final int countRows = queryHelper.countRows(UNION_DATASOURCE + i, "2013-08-31/2013-09-01");
|
final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01");
|
||||||
if (countRows < 5) {
|
if (countRows < 5) {
|
||||||
LOG.warn("%d events have been ingested to %s so far", countRows, UNION_DATASOURCE + i);
|
LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -119,7 +131,23 @@ public class ITUnionQueryTest extends AbstractIndexerTest
|
||||||
|
|
||||||
// should hit the queries on realtime task
|
// should hit the queries on realtime task
|
||||||
LOG.info("Running Union Queries..");
|
LOG.info("Running Union Queries..");
|
||||||
this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2);
|
|
||||||
|
String queryResponseTemplate;
|
||||||
|
try {
|
||||||
|
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(UNION_QUERIES_RESOURCE);
|
||||||
|
queryResponseTemplate = IOUtils.toString(is, "UTF-8");
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new ISE(e, "could not read query file: %s", UNION_QUERIES_RESOURCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
queryResponseTemplate = StringUtils.replace(
|
||||||
|
queryResponseTemplate,
|
||||||
|
"%%DATASOURCE%%",
|
||||||
|
fullDatasourceName
|
||||||
|
);
|
||||||
|
|
||||||
|
this.queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
|
|
||||||
// wait for the task to complete
|
// wait for the task to complete
|
||||||
for (int i = 0; i < numTasks; i++) {
|
for (int i = 0; i < numTasks; i++) {
|
||||||
|
@ -134,7 +162,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
|
||||||
@Override
|
@Override
|
||||||
public Boolean call()
|
public Boolean call()
|
||||||
{
|
{
|
||||||
return coordinator.areSegmentsLoaded(UNION_DATASOURCE + taskNum);
|
return coordinator.areSegmentsLoaded(fullDatasourceName + taskNum);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
true,
|
true,
|
||||||
|
@ -144,7 +172,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// run queries on historical nodes
|
// run queries on historical nodes
|
||||||
this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2);
|
this.queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Throwable e) {
|
catch (Throwable e) {
|
||||||
|
@ -162,7 +190,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
|
||||||
|
|
||||||
private String withDataSource(String taskAsString, String dataSource)
|
private String withDataSource(String taskAsString, String dataSource)
|
||||||
{
|
{
|
||||||
return StringUtils.replace(taskAsString, UNION_DATASOURCE, dataSource);
|
return StringUtils.replace(taskAsString, "%%DATASOURCE%%", dataSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String withServiceName(String taskAsString, String serviceName)
|
private String withServiceName(String taskAsString, String serviceName)
|
||||||
|
|
|
@ -6,8 +6,8 @@
|
||||||
"dataSource": {
|
"dataSource": {
|
||||||
"type": "union",
|
"type": "union",
|
||||||
"dataSources": [
|
"dataSources": [
|
||||||
"wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
|
"%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
|
||||||
"wikipedia_index_test0"
|
"%%DATASOURCE%%0"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"intervals": ["2013-08-31/2013-09-01"],
|
"intervals": ["2013-08-31/2013-09-01"],
|
||||||
|
@ -69,8 +69,8 @@
|
||||||
"dataSource": {
|
"dataSource": {
|
||||||
"type": "union",
|
"type": "union",
|
||||||
"dataSources": [
|
"dataSources": [
|
||||||
"wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
|
"%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
|
||||||
"wikipedia_index_test0"
|
"%%DATASOURCE%%0"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"intervals": ["2013-08-31/2013-09-01"],
|
"intervals": ["2013-08-31/2013-09-01"],
|
||||||
|
@ -149,8 +149,8 @@
|
||||||
"dataSource": {
|
"dataSource": {
|
||||||
"type": "union",
|
"type": "union",
|
||||||
"dataSources": [
|
"dataSources": [
|
||||||
"wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
|
"%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
|
||||||
"wikipedia_index_test0"
|
"%%DATASOURCE%%0"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"intervals": ["2013-08-31/2013-09-01"],
|
"intervals": ["2013-08-31/2013-09-01"],
|
||||||
|
@ -263,8 +263,8 @@
|
||||||
"dataSource": {
|
"dataSource": {
|
||||||
"type": "union",
|
"type": "union",
|
||||||
"dataSources": [
|
"dataSources": [
|
||||||
"wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
|
"%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
|
||||||
"wikipedia_index_test0"
|
"%%DATASOURCE%%0"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"intervals": ["2013-08-31/2013-09-01"],
|
"intervals": ["2013-08-31/2013-09-01"],
|
||||||
|
@ -344,8 +344,8 @@
|
||||||
"dataSource": {
|
"dataSource": {
|
||||||
"type": "union",
|
"type": "union",
|
||||||
"dataSources": [
|
"dataSources": [
|
||||||
"wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
|
"%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
|
||||||
"wikipedia_index_test0"
|
"%%DATASOURCE%%0"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"intervals": ["2013-08-31/2013-09-01"],
|
"intervals": ["2013-08-31/2013-09-01"],
|
||||||
|
@ -417,8 +417,8 @@
|
||||||
"dataSource": {
|
"dataSource": {
|
||||||
"type": "union",
|
"type": "union",
|
||||||
"dataSources": [
|
"dataSources": [
|
||||||
"wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
|
"%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
|
||||||
"wikipedia_index_test0"
|
"%%DATASOURCE%%0"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"intervals": ["2013-08-31/2013-09-01"],
|
"intervals": ["2013-08-31/2013-09-01"],
|
||||||
|
@ -508,8 +508,8 @@
|
||||||
"dataSource": {
|
"dataSource": {
|
||||||
"type": "union",
|
"type": "union",
|
||||||
"dataSources": [
|
"dataSources": [
|
||||||
"wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
|
"%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
|
||||||
"wikipedia_index_test0"
|
"%%DATASOURCE%%0"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"granularity": "all",
|
"granularity": "all",
|
||||||
|
@ -548,8 +548,8 @@
|
||||||
"dataSource": {
|
"dataSource": {
|
||||||
"type": "union",
|
"type": "union",
|
||||||
"dataSources": [
|
"dataSources": [
|
||||||
"wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3",
|
"%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3",
|
||||||
"wikipedia_index_test0"
|
"%%DATASOURCE%%0"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"type" : "compact",
|
"type" : "compact",
|
||||||
"dataSource" : "wikipedia_index_test",
|
"dataSource" : "%%DATASOURCE%%",
|
||||||
"interval" : "2013-08-31/2013-09-02",
|
"interval" : "2013-08-31/2013-09-02",
|
||||||
"keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY}
|
"keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY}
|
||||||
}
|
}
|
|
@ -3,7 +3,7 @@
|
||||||
"description": "timeseries, 1 agg, all",
|
"description": "timeseries, 1 agg, all",
|
||||||
"query":{
|
"query":{
|
||||||
"queryType" : "timeBoundary",
|
"queryType" : "timeBoundary",
|
||||||
"dataSource": "wikipedia_index_test"
|
"dataSource": "%%DATASOURCE%%"
|
||||||
},
|
},
|
||||||
"expectedResults":[
|
"expectedResults":[
|
||||||
{
|
{
|
||||||
|
@ -20,7 +20,7 @@
|
||||||
"description":"having spec on post aggregation",
|
"description":"having spec on post aggregation",
|
||||||
"query":{
|
"query":{
|
||||||
"queryType":"groupBy",
|
"queryType":"groupBy",
|
||||||
"dataSource":"wikipedia_index_test",
|
"dataSource":"%%DATASOURCE%%",
|
||||||
"granularity":"day",
|
"granularity":"day",
|
||||||
"dimensions":[
|
"dimensions":[
|
||||||
"page"
|
"page"
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
"type": "index",
|
"type": "index",
|
||||||
"spec": {
|
"spec": {
|
||||||
"dataSchema": {
|
"dataSchema": {
|
||||||
"dataSource": "wikipedia_index_test",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"metricsSpec": [
|
"metricsSpec": [
|
||||||
{
|
{
|
||||||
"type": "count",
|
"type": "count",
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
"description": "timeseries, 1 agg, all",
|
"description": "timeseries, 1 agg, all",
|
||||||
"query":{
|
"query":{
|
||||||
"queryType" : "timeBoundary",
|
"queryType" : "timeBoundary",
|
||||||
"dataSource": "wikipedia_parallel_index_test"
|
"dataSource": "%%DATASOURCE%%"
|
||||||
},
|
},
|
||||||
"expectedResults":[
|
"expectedResults":[
|
||||||
{
|
{
|
||||||
|
@ -20,7 +20,7 @@
|
||||||
"description":"having spec on post aggregation",
|
"description":"having spec on post aggregation",
|
||||||
"query":{
|
"query":{
|
||||||
"queryType":"groupBy",
|
"queryType":"groupBy",
|
||||||
"dataSource":"wikipedia_parallel_index_test",
|
"dataSource":"%%DATASOURCE%%",
|
||||||
"granularity":"day",
|
"granularity":"day",
|
||||||
"dimensions":[
|
"dimensions":[
|
||||||
"page"
|
"page"
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
"type": "index_parallel",
|
"type": "index_parallel",
|
||||||
"spec": {
|
"spec": {
|
||||||
"dataSchema": {
|
"dataSchema": {
|
||||||
"dataSource": "wikipedia_parallel_index_test",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"metricsSpec": [
|
"metricsSpec": [
|
||||||
{
|
{
|
||||||
"type": "count",
|
"type": "count",
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
"description": "timeBoundary",
|
"description": "timeBoundary",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType":"timeBoundary",
|
"queryType":"timeBoundary",
|
||||||
"dataSource":"wikipedia_index_test"
|
"dataSource":"%%DATASOURCE%%"
|
||||||
},
|
},
|
||||||
"expectedResults":[
|
"expectedResults":[
|
||||||
{
|
{
|
||||||
|
@ -19,7 +19,7 @@
|
||||||
"description": "timeseries",
|
"description": "timeseries",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType": "timeseries",
|
"queryType": "timeseries",
|
||||||
"dataSource": "wikipedia_index_test",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
|
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
|
||||||
"granularity": "all",
|
"granularity": "all",
|
||||||
"aggregations": [
|
"aggregations": [
|
||||||
|
@ -41,7 +41,7 @@
|
||||||
"description":"having spec on post aggregation",
|
"description":"having spec on post aggregation",
|
||||||
"query":{
|
"query":{
|
||||||
"queryType":"groupBy",
|
"queryType":"groupBy",
|
||||||
"dataSource":"wikipedia_index_test",
|
"dataSource":"%%DATASOURCE%%",
|
||||||
"granularity":"minute",
|
"granularity":"minute",
|
||||||
"dimensions":[
|
"dimensions":[
|
||||||
"page"
|
"page"
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
"type": "index_realtime_appenderator",
|
"type": "index_realtime_appenderator",
|
||||||
"spec": {
|
"spec": {
|
||||||
"dataSchema": {
|
"dataSchema": {
|
||||||
"dataSource": "wikipedia_index_test",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"metricsSpec": [
|
"metricsSpec": [
|
||||||
{
|
{
|
||||||
"type": "count",
|
"type": "count",
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
"description": "timeBoundary",
|
"description": "timeBoundary",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType":"timeBoundary",
|
"queryType":"timeBoundary",
|
||||||
"dataSource":"wikipedia_index_test"
|
"dataSource":"%%DATASOURCE%%"
|
||||||
},
|
},
|
||||||
"expectedResults":[
|
"expectedResults":[
|
||||||
{
|
{
|
||||||
|
@ -19,7 +19,7 @@
|
||||||
"description": "timeseries",
|
"description": "timeseries",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType": "timeseries",
|
"queryType": "timeseries",
|
||||||
"dataSource": "wikipedia_index_test",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
|
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
|
||||||
"granularity": "all",
|
"granularity": "all",
|
||||||
"aggregations": [
|
"aggregations": [
|
||||||
|
@ -41,7 +41,7 @@
|
||||||
"description":"having spec on post aggregation",
|
"description":"having spec on post aggregation",
|
||||||
"query":{
|
"query":{
|
||||||
"queryType":"groupBy",
|
"queryType":"groupBy",
|
||||||
"dataSource":"wikipedia_index_test",
|
"dataSource":"%%DATASOURCE%%",
|
||||||
"granularity":"minute",
|
"granularity":"minute",
|
||||||
"dimensions":[
|
"dimensions":[
|
||||||
"page"
|
"page"
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
"type": "index_realtime",
|
"type": "index_realtime",
|
||||||
"spec": {
|
"spec": {
|
||||||
"dataSchema": {
|
"dataSchema": {
|
||||||
"dataSource": "wikipedia_index_test",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"metricsSpec": [
|
"metricsSpec": [
|
||||||
{
|
{
|
||||||
"type": "count",
|
"type": "count",
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
"type": "index",
|
"type": "index",
|
||||||
"spec": {
|
"spec": {
|
||||||
"dataSchema": {
|
"dataSchema": {
|
||||||
"dataSource": "wikipedia_reindex_test",
|
"dataSource": "%%REINDEX_DATASOURCE%%",
|
||||||
"metricsSpec": [
|
"metricsSpec": [
|
||||||
{
|
{
|
||||||
"type": "doubleSum",
|
"type": "doubleSum",
|
||||||
|
@ -42,7 +42,7 @@
|
||||||
"type": "index",
|
"type": "index",
|
||||||
"firehose": {
|
"firehose": {
|
||||||
"type": "ingestSegment",
|
"type": "ingestSegment",
|
||||||
"dataSource": "wikipedia_index_test",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"interval": "2013-08-31/2013-09-01"
|
"interval": "2013-08-31/2013-09-01"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
"type": "index_realtime",
|
"type": "index_realtime",
|
||||||
"spec": {
|
"spec": {
|
||||||
"dataSchema": {
|
"dataSchema": {
|
||||||
"dataSource": "wikipedia_index_test",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"metricsSpec": [
|
"metricsSpec": [
|
||||||
{
|
{
|
||||||
"type": "count",
|
"type": "count",
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
"type": "index",
|
"type": "index",
|
||||||
"spec": {
|
"spec": {
|
||||||
"dataSchema": {
|
"dataSchema": {
|
||||||
"dataSource": "wikiticker",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"granularitySpec": {
|
"granularitySpec": {
|
||||||
"type": "uniform",
|
"type": "uniform",
|
||||||
"segmentGranularity": "day",
|
"segmentGranularity": "day",
|
||||||
|
@ -18,20 +18,7 @@
|
||||||
"dimensionsSpec": {
|
"dimensionsSpec": {
|
||||||
"dimensions": [
|
"dimensions": [
|
||||||
"channel",
|
"channel",
|
||||||
"cityName",
|
|
||||||
"comment",
|
|
||||||
"countryIsoCode",
|
|
||||||
"countryName",
|
|
||||||
"isAnonymous",
|
|
||||||
"isMinor",
|
|
||||||
"isNew",
|
|
||||||
"isRobot",
|
|
||||||
"isUnpatrolled",
|
|
||||||
"metroCode",
|
|
||||||
"namespace",
|
|
||||||
"page",
|
"page",
|
||||||
"regionIsoCode",
|
|
||||||
"regionName",
|
|
||||||
"user"
|
"user"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
@ -60,11 +47,6 @@
|
||||||
"name": "delta",
|
"name": "delta",
|
||||||
"type": "longSum",
|
"type": "longSum",
|
||||||
"fieldName": "delta"
|
"fieldName": "delta"
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "user_unique",
|
|
||||||
"type": "hyperUnique",
|
|
||||||
"fieldName": "user"
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
@ -72,7 +54,7 @@
|
||||||
"type": "index",
|
"type": "index",
|
||||||
"firehose": {
|
"firehose": {
|
||||||
"type": "local",
|
"type": "local",
|
||||||
"baseDir": "/examples/quickstart/tutorial",
|
"baseDir": "/shared/wikiticker-it",
|
||||||
"filter": "wikiticker-2015-09-12-sampled.json.gz"
|
"filter": "wikiticker-2015-09-12-sampled.json.gz"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"type": "query",
|
"type": "query",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType": "groupBy",
|
"queryType": "groupBy",
|
||||||
"dataSource": "wikiticker",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"intervals": [
|
"intervals": [
|
||||||
"2015-09-12/2015-09-13"
|
"2015-09-12/2015-09-13"
|
||||||
],
|
],
|
||||||
|
@ -60,7 +60,7 @@
|
||||||
"type": "query",
|
"type": "query",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType": "groupBy",
|
"queryType": "groupBy",
|
||||||
"dataSource": "wikiticker",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"intervals": [
|
"intervals": [
|
||||||
"2015-09-12/2015-09-13"
|
"2015-09-12/2015-09-13"
|
||||||
],
|
],
|
||||||
|
@ -113,7 +113,7 @@
|
||||||
"type": "query",
|
"type": "query",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType": "groupBy",
|
"queryType": "groupBy",
|
||||||
"dataSource": "wikiticker",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"intervals": [
|
"intervals": [
|
||||||
"2015-09-12/2015-09-13"
|
"2015-09-12/2015-09-13"
|
||||||
],
|
],
|
||||||
|
@ -191,7 +191,7 @@
|
||||||
"type": "query",
|
"type": "query",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType": "groupBy",
|
"queryType": "groupBy",
|
||||||
"dataSource": "wikiticker",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"intervals": [
|
"intervals": [
|
||||||
"2015-09-12/2015-09-13"
|
"2015-09-12/2015-09-13"
|
||||||
],
|
],
|
||||||
|
@ -253,7 +253,7 @@
|
||||||
"type": "query",
|
"type": "query",
|
||||||
"query": {
|
"query": {
|
||||||
"queryType": "groupBy",
|
"queryType": "groupBy",
|
||||||
"dataSource": "wikiticker",
|
"dataSource": "%%DATASOURCE%%",
|
||||||
"intervals": [
|
"intervals": [
|
||||||
"2015-09-12/2015-09-13"
|
"2015-09-12/2015-09-13"
|
||||||
],
|
],
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class CoordinatorClient
|
||||||
HttpMethod.GET,
|
HttpMethod.GET,
|
||||||
StringUtils.format(
|
StringUtils.format(
|
||||||
"/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
|
"/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
|
||||||
dataSource,
|
StringUtils.urlEncode(dataSource),
|
||||||
descriptor.getInterval(),
|
descriptor.getInterval(),
|
||||||
descriptor.getPartitionNumber(),
|
descriptor.getPartitionNumber(),
|
||||||
descriptor.getVersion()
|
descriptor.getVersion()
|
||||||
|
|
|
@ -154,7 +154,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
|
||||||
final FullResponseHolder response = druidLeaderClient.go(
|
final FullResponseHolder response = druidLeaderClient.go(
|
||||||
druidLeaderClient.makeRequest(
|
druidLeaderClient.makeRequest(
|
||||||
HttpMethod.POST,
|
HttpMethod.POST,
|
||||||
StringUtils.format("/druid/indexer/v1/task/%s/shutdown", taskId)
|
StringUtils.format(
|
||||||
|
"/druid/indexer/v1/task/%s/shutdown",
|
||||||
|
StringUtils.urlEncode(taskId)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -255,7 +258,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
final FullResponseHolder responseHolder = druidLeaderClient.go(
|
final FullResponseHolder responseHolder = druidLeaderClient.go(
|
||||||
druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s/status", taskId))
|
druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format(
|
||||||
|
"/druid/indexer/v1/task/%s/status",
|
||||||
|
StringUtils.urlEncode(taskId)
|
||||||
|
))
|
||||||
);
|
);
|
||||||
|
|
||||||
return jsonMapper.readValue(
|
return jsonMapper.readValue(
|
||||||
|
@ -303,7 +309,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
|
||||||
{
|
{
|
||||||
final String endPoint = StringUtils.format(
|
final String endPoint = StringUtils.format(
|
||||||
"/druid/indexer/v1/pendingSegments/%s?interval=%s",
|
"/druid/indexer/v1/pendingSegments/%s?interval=%s",
|
||||||
dataSource,
|
StringUtils.urlEncode(dataSource),
|
||||||
new Interval(DateTimes.MIN, end)
|
new Interval(DateTimes.MIN, end)
|
||||||
);
|
);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.segment.realtime.firehose;
|
||||||
|
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
|
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
|
||||||
|
|
||||||
import javax.ws.rs.Path;
|
import javax.ws.rs.Path;
|
||||||
|
@ -49,7 +50,7 @@ public class ChatHandlerResource
|
||||||
{
|
{
|
||||||
if (taskId != null) {
|
if (taskId != null) {
|
||||||
List<String> requestTaskId = headers.getRequestHeader(TASK_ID_HEADER);
|
List<String> requestTaskId = headers.getRequestHeader(TASK_ID_HEADER);
|
||||||
if (requestTaskId != null && !requestTaskId.contains(taskId)) {
|
if (requestTaskId != null && !requestTaskId.contains(StringUtils.urlEncode(taskId))) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.druid.server.initialization.jetty;
|
package org.apache.druid.server.initialization.jetty;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
|
import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
|
||||||
|
|
||||||
public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder
|
public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder
|
||||||
|
@ -29,7 +30,7 @@ public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder
|
||||||
super(path,
|
super(path,
|
||||||
taskId == null
|
taskId == null
|
||||||
? ImmutableMap.of()
|
? ImmutableMap.of()
|
||||||
: ImmutableMap.of(ChatHandlerResource.TASK_ID_HEADER, taskId)
|
: ImmutableMap.of(ChatHandlerResource.TASK_ID_HEADER, StringUtils.urlEncode(taskId))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue