mirror of https://github.com/apache/druid.git
some changes for support of our tests
This commit is contained in:
parent
6a7f8fa054
commit
6c9ba72dea
|
@ -22,6 +22,13 @@ RUN wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeep
|
|||
RUN cp /usr/local/zookeeper-3.4.6/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.6/conf/zoo.cfg
|
||||
RUN ln -s /usr/local/zookeeper-3.4.6 /usr/local/zookeeper
|
||||
|
||||
# Kafka
|
||||
RUN wget -q -O - http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz | tar -xzf - -C /usr/local
|
||||
RUN ln -s /usr/local/kafka_2.10-0.8.2.0 /usr/local/kafka
|
||||
# unless advertised.host.name is set to docker ip, publishing data fails
|
||||
ADD docker_ip docker_ip
|
||||
RUN perl -pi -e "s/#advertised.port=.*/advertised.port=9092/; s/#advertised.host.*/advertised.host.name=$(cat docker_ip)/" /usr/local/kafka/config/server.properties
|
||||
|
||||
# git
|
||||
RUN apt-get install -y git
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ command=java
|
|||
-Duser.timezone=UTC
|
||||
-Dfile.encoding=UTF-8
|
||||
-Ddruid.host=%(ENV_HOST_IP)s
|
||||
-Ddruid.zk.service.host=druid-zookeeper
|
||||
-Ddruid.zk.service.host=druid-zookeeper-kafka
|
||||
-Ddruid.processing.buffer.sizeBytes=75000000
|
||||
-Ddruid.server.http.numThreads=100
|
||||
-Ddruid.processing.numThreads=1
|
||||
|
|
|
@ -13,7 +13,7 @@ command=java
|
|||
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid
|
||||
-Ddruid.metadata.storage.connector.user=druid
|
||||
-Ddruid.metadata.storage.connector.password=diurd
|
||||
-Ddruid.zk.service.host=druid-zookeeper
|
||||
-Ddruid.zk.service.host=druid-zookeeper-kafka
|
||||
-Ddruid.coordinator.startDelay=PT5S
|
||||
-cp /usr/local/druid/lib/*
|
||||
io.druid.cli.Main server coordinator
|
||||
|
|
|
@ -11,7 +11,7 @@ command=java
|
|||
-Duser.timezone=UTC
|
||||
-Dfile.encoding=UTF-8
|
||||
-Ddruid.host=%(ENV_HOST_IP)s
|
||||
-Ddruid.zk.service.host=druid-zookeeper
|
||||
-Ddruid.zk.service.host=druid-zookeeper-kafka
|
||||
-Ddruid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
|
||||
-Ddruid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
-Ddruid.processing.buffer.sizeBytes=75000000
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
[program:kafka]
|
||||
command=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
|
||||
user=daemon
|
||||
priority=0
|
||||
stdout_logfile=/shared/logs/kafka.log
|
|
@ -9,7 +9,7 @@ command=java
|
|||
-Duser.timezone=UTC
|
||||
-Dfile.encoding=UTF-8
|
||||
-Ddruid.host=%(ENV_HOST_IP)s
|
||||
-Ddruid.zk.service.host=druid-zookeeper
|
||||
-Ddruid.zk.service.host=druid-zookeeper-kafka
|
||||
-Ddruid.indexer.logs.directory=/shared/tasklogs
|
||||
-Ddruid.storage.storageDirectory=/shared/storage
|
||||
-Ddruid.indexer.runner.javaOpts=-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
|
||||
|
|
|
@ -13,7 +13,7 @@ command=java
|
|||
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid
|
||||
-Ddruid.metadata.storage.connector.user=druid
|
||||
-Ddruid.metadata.storage.connector.password=diurd
|
||||
-Ddruid.zk.service.host=druid-zookeeper
|
||||
-Ddruid.zk.service.host=druid-zookeeper-kafka
|
||||
-Ddruid.indexer.storage.type=metadata
|
||||
-Ddruid.indexer.logs.directory=/shared/tasklogs
|
||||
-Ddruid.indexer.runner.type=remote
|
||||
|
|
|
@ -8,7 +8,7 @@ command=java
|
|||
-Duser.timezone=UTC
|
||||
-Dfile.encoding=UTF-8
|
||||
-Ddruid.host=%(ENV_HOST_IP)s
|
||||
-Ddruid.zk.service.host=druid-zookeeper
|
||||
-Ddruid.zk.service.host=druid-zookeeper-kafka
|
||||
-Ddruid.computation.buffer.size=75000000
|
||||
-Ddruid.server.http.numThreads=100
|
||||
-Ddruid.processing.numThreads=1
|
||||
|
|
|
@ -29,6 +29,11 @@
|
|||
<version>0.9.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
<apache.kafka.version>0.8.2.1</apache.kafka.version>
|
||||
<zkclient.version>0.4</zkclient.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
|
@ -40,6 +45,11 @@
|
|||
<artifactId>druid-s3-extensions</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid.extensions</groupId>
|
||||
<artifactId>druid-kafka-eight</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid.extensions</groupId>
|
||||
<artifactId>druid-histogram</artifactId>
|
||||
|
@ -71,6 +81,29 @@
|
|||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.101tec</groupId>
|
||||
<artifactId>zkclient</artifactId>
|
||||
<version>${zkclient.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.10</artifactId>
|
||||
<version>${apache.kafka.version}</version>
|
||||
<!-- without this exclusion, there's interference that affects the log level -->
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# cleanup
|
||||
for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper druid-metadata-storage;
|
||||
for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage;
|
||||
do
|
||||
docker stop $node
|
||||
docker rm $node
|
||||
|
@ -12,6 +12,9 @@ SHARED_DIR=${HOME}/shared
|
|||
SUPERVISORDIR=/usr/lib/druid/conf
|
||||
RESOURCEDIR=$DIR/src/test/resources
|
||||
|
||||
# so docker IP addr will be known during docker build
|
||||
echo $DOCKER_IP > $DOCKERDIR/docker_ip
|
||||
|
||||
# Make directories if they dont exist
|
||||
mkdir -p $SHARED_DIR/logs
|
||||
mkdir -p $SHARED_DIR/tasklogs
|
||||
|
@ -24,26 +27,26 @@ mvn dependency:copy-dependencies -DoutputDirectory=$SHARED_DIR/docker/lib
|
|||
# Build Druid Cluster Image
|
||||
docker build -t druid/cluster $SHARED_DIR/docker
|
||||
|
||||
# Start zookeeper
|
||||
docker run -d --name druid-zookeeper -p 2181:2181 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf druid/cluster
|
||||
# Start zookeeper and kafka
|
||||
docker run -d --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster
|
||||
|
||||
# Start MYSQL
|
||||
docker run -d --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster
|
||||
|
||||
# Start Overlord
|
||||
docker run -d --name druid-overlord -p 8090:8090 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper:druid-zookeeper druid/cluster
|
||||
docker run -d --name druid-overlord -p 8090:8090 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
||||
|
||||
# Start Coordinator
|
||||
docker run -d --name druid-coordinator -p 8081:8081 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper:druid-zookeeper druid/cluster
|
||||
docker run -d --name druid-coordinator -p 8081:8081 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
||||
|
||||
# Start Historical
|
||||
docker run -d --name druid-historical -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper:druid-zookeeper druid/cluster
|
||||
docker run -d --name druid-historical -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
|
||||
|
||||
# Start Middlemanger
|
||||
docker run -d --name druid-middlemanager -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper:druid-zookeeper --link druid-overlord:druid-overlord druid/cluster
|
||||
docker run -d --name druid-middlemanager -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster
|
||||
|
||||
# Start Broker
|
||||
docker run -d --name druid-broker -p 8082:8082 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper:druid-zookeeper --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
|
||||
docker run -d --name druid-broker -p 8082:8082 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
|
||||
|
||||
# Start Router
|
||||
docker run -d --name druid-router -p 8888:8888 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper:druid-zookeeper --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
||||
docker run -d --name druid-router -p 8888:8888 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
|
||||
|
|
|
@ -42,6 +42,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
|
|||
private String indexerHost = "";
|
||||
private String middleManagerHost = "";
|
||||
private String zookeeperHosts = ""; // comma-separated list of host:port
|
||||
private String kafkaHost = "";
|
||||
private Map<String, String> props = null;
|
||||
|
||||
@JsonCreator
|
||||
|
@ -62,20 +63,26 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
|
|||
catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
routerHost = props.get("router_host") + ":" + props.get("router_port");
|
||||
// there might not be a router; we want routerHost to be null in that case
|
||||
routerHost = props.get("router_host");
|
||||
if (null != routerHost) {
|
||||
routerHost += ":" + props.get("router_port");
|
||||
}
|
||||
brokerHost = props.get("broker_host") + ":" + props.get("broker_port");
|
||||
historicalHost = props.get("historical_host") + ":" + props.get("historical_port");
|
||||
coordinatorHost = props.get("coordinator_host") + ":" + props.get("coordinator_port");
|
||||
indexerHost = props.get("indexer_host") + ":" + props.get("indexer_port");
|
||||
middleManagerHost = props.get("middlemanager_host");
|
||||
zookeeperHosts = props.get("zookeeper_hosts");
|
||||
kafkaHost = props.get("kafka_host") + ":" + props.get ("kafka_port");
|
||||
|
||||
LOG.info ("router: [%s]", routerHost);
|
||||
LOG.info ("broker [%s]: ", brokerHost);
|
||||
LOG.info ("broker: [%s]", brokerHost);
|
||||
LOG.info ("coordinator: [%s]", coordinatorHost);
|
||||
LOG.info ("overlord: [%s]", indexerHost);
|
||||
LOG.info ("middle manager: [%s]", middleManagerHost);
|
||||
LOG.info ("zookeepers: [%s]", zookeeperHosts);
|
||||
LOG.info ("kafka: [%s]", kafkaHost);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,6 +132,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
|
|||
return zookeeperHosts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKafkaHost()
|
||||
{
|
||||
return kafkaHost;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProperty(String keyword)
|
||||
{
|
||||
|
|
|
@ -76,6 +76,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
|
|||
return dockerIp + ":2181";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKafkaHost()
|
||||
{
|
||||
return dockerIp + ":9092";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProperty(String prop)
|
||||
{
|
||||
|
|
|
@ -35,5 +35,7 @@ public interface IntegrationTestingConfig
|
|||
|
||||
public String getZookeeperHosts();
|
||||
|
||||
public String getKafkaHost();
|
||||
|
||||
public String getProperty(String prop);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.testing;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.curator.CuratorConfig;
|
||||
|
||||
/**
|
||||
* We will use this instead of druid server's CuratorConfig, because CuratorConfig in
|
||||
* a test cluster environment sees zookeeper at localhost even if zookeeper is elsewhere.
|
||||
* We'll take the zookeeper host from the configuration file instead.
|
||||
*/
|
||||
public class IntegrationTestingCuratorConfig extends CuratorConfig
|
||||
{
|
||||
private IntegrationTestingConfig config;
|
||||
|
||||
@Inject
|
||||
public IntegrationTestingCuratorConfig (IntegrationTestingConfig config)
|
||||
{
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getZkHosts()
|
||||
{
|
||||
return config.getZookeeperHosts();
|
||||
}
|
||||
}
|
|
@ -115,7 +115,7 @@ public class OverlordResourceTestClient
|
|||
public TaskStatus.Status getTaskStatus(String taskID)
|
||||
{
|
||||
try {
|
||||
StatusResponseHolder response = makeRequest(
|
||||
StatusResponseHolder response = makeRequest( HttpMethod.GET,
|
||||
String.format(
|
||||
"%stask/%s/status",
|
||||
getIndexerURL(),
|
||||
|
@ -156,7 +156,7 @@ public class OverlordResourceTestClient
|
|||
private List<TaskResponseObject> getTasks(String identifier)
|
||||
{
|
||||
try {
|
||||
StatusResponseHolder response = makeRequest(
|
||||
StatusResponseHolder response = makeRequest( HttpMethod.GET,
|
||||
String.format("%s%s", getIndexerURL(), identifier)
|
||||
);
|
||||
LOG.info("Tasks %s response %s", identifier, response.getContent());
|
||||
|
@ -171,6 +171,26 @@ public class OverlordResourceTestClient
|
|||
}
|
||||
}
|
||||
|
||||
public Map<String, String> shutDownTask(String taskID)
|
||||
{
|
||||
try {
|
||||
StatusResponseHolder response = makeRequest( HttpMethod.POST,
|
||||
String.format("%stask/%s/shutdown", getIndexerURL(),
|
||||
URLEncoder.encode(taskID, "UTF-8")
|
||||
)
|
||||
);
|
||||
LOG.info("Shutdown Task %s response %s", taskID, response.getContent());
|
||||
return jsonMapper.readValue(
|
||||
response.getContent(), new TypeReference<Map<String, String>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void waitUntilTaskCompletes(final String taskID)
|
||||
{
|
||||
RetryUtil.retryUntil(
|
||||
|
@ -193,11 +213,11 @@ public class OverlordResourceTestClient
|
|||
);
|
||||
}
|
||||
|
||||
private StatusResponseHolder makeRequest(String url)
|
||||
private StatusResponseHolder makeRequest(HttpMethod method, String url)
|
||||
{
|
||||
try {
|
||||
StatusResponseHolder response = this.httpClient
|
||||
.go(new Request(HttpMethod.GET, new URL(url)), responseHandler).get();
|
||||
.go(new Request(method, new URL(url)), responseHandler).get();
|
||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||
throw new ISE("Error while making request to indexer [%s %s]", response.getStatus(), response.getContent());
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import io.druid.guice.LazySingleton;
|
|||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.testing.IntegrationTestingConfig;
|
||||
import io.druid.testing.IntegrationTestingConfigProvider;
|
||||
import io.druid.curator.CuratorConfig;
|
||||
import io.druid.testing.IntegrationTestingCuratorConfig;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -40,6 +42,8 @@ public class DruidTestModule implements Module
|
|||
{
|
||||
binder.bind(IntegrationTestingConfig.class).toProvider(IntegrationTestingConfigProvider.class).in(ManageLifecycle.class);
|
||||
JsonConfigProvider.bind(binder, "druid.test.config", IntegrationTestingConfigProvider.class);
|
||||
|
||||
binder.bind(CuratorConfig.class).to(IntegrationTestingCuratorConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
|
|
@ -90,7 +90,11 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory
|
|||
;
|
||||
waitUntilInstanceReady(client, config.getCoordinatorHost());
|
||||
waitUntilInstanceReady(client, config.getIndexerHost());
|
||||
waitUntilInstanceReady(client, config.getRouterHost());
|
||||
waitUntilInstanceReady(client, config.getBrokerHost());
|
||||
String routerHost = config.getRouterHost();
|
||||
if (null != routerHost) {
|
||||
waitUntilInstanceReady(client, config.getRouterHost());
|
||||
}
|
||||
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
|
||||
try {
|
||||
lifecycle.start();
|
||||
|
|
|
@ -0,0 +1,304 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.tests.indexer;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.testing.IntegrationTestingConfig;
|
||||
import io.druid.testing.guice.DruidTestModuleFactory;
|
||||
import io.druid.testing.utils.RetryUtil;
|
||||
import io.druid.testing.utils.TestQueryHelper;
|
||||
import kafka.admin.AdminUtils;
|
||||
import kafka.common.TopicExistsException;
|
||||
import kafka.javaapi.producer.Producer;
|
||||
import kafka.producer.KeyedMessage;
|
||||
import kafka.producer.ProducerConfig;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/*
|
||||
* This is a test for the kafka firehose.
|
||||
*/
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKafkaTest extends AbstractIndexerTest
|
||||
{
|
||||
private static final Logger LOG = new Logger(ITKafkaTest.class);
|
||||
private static final int DELAY_BETWEEN_EVENTS_SECS = 5;
|
||||
private static final String INDEXER_FILE = "/indexer/kafka_index_task.json";
|
||||
private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
|
||||
private static final String DATASOURCE = "kafka_test";
|
||||
private static final String TOPIC_NAME = "kafkaTopic";
|
||||
private static final int MINUTES_TO_SEND = 2;
|
||||
|
||||
// We'll fill in the current time and numbers for added, deleted and changed
|
||||
// before sending the event.
|
||||
final String event_template =
|
||||
"{\"timestamp\": \"%s\"," +
|
||||
"\"page\": \"Gypsy Danger\"," +
|
||||
"\"language\" : \"en\"," +
|
||||
"\"user\" : \"nuclear\"," +
|
||||
"\"unpatrolled\" : \"true\"," +
|
||||
"\"newPage\" : \"true\"," +
|
||||
"\"robot\": \"false\"," +
|
||||
"\"anonymous\": \"false\"," +
|
||||
"\"namespace\":\"article\"," +
|
||||
"\"continent\":\"North America\"," +
|
||||
"\"country\":\"United States\"," +
|
||||
"\"region\":\"Bay Area\"," +
|
||||
"\"city\":\"San Francisco\"," +
|
||||
"\"added\":%d," +
|
||||
"\"deleted\":%d," +
|
||||
"\"delta\":%d}";
|
||||
|
||||
private String taskID;
|
||||
private ZkClient zkClient;
|
||||
private Boolean segmentsExist; // to tell if we should remove segments during teardown
|
||||
|
||||
// format for the querying interval
|
||||
private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
|
||||
// format for the expected timestamp in a query response
|
||||
private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
|
||||
private DateTime dtFirst; // timestamp of 1st event
|
||||
private DateTime dtLast; // timestamp of last event
|
||||
|
||||
@Inject
|
||||
private TestQueryHelper queryHelper;
|
||||
@Inject
|
||||
private IntegrationTestingConfig config;
|
||||
|
||||
@Test
|
||||
public void testKafka()
|
||||
{
|
||||
LOG.info("Starting test: ITKafkaTest");
|
||||
|
||||
// create topic
|
||||
try {
|
||||
int sessionTimeoutMs = 10000;
|
||||
int connectionTimeoutMs = 10000;
|
||||
String zkHosts = config.getZookeeperHosts();
|
||||
zkClient = new ZkClient(
|
||||
zkHosts, sessionTimeoutMs, connectionTimeoutMs,
|
||||
ZKStringSerializer$.MODULE$
|
||||
);
|
||||
int numPartitions = 1;
|
||||
int replicationFactor = 1;
|
||||
Properties topicConfig = new Properties();
|
||||
AdminUtils.createTopic(zkClient, TOPIC_NAME, numPartitions, replicationFactor, topicConfig);
|
||||
}
|
||||
catch (TopicExistsException e) {
|
||||
// it's ok if the topic already exists
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ISE(e, "could not create kafka topic");
|
||||
}
|
||||
|
||||
String indexerSpec = "";
|
||||
|
||||
// replace temp strings in indexer file
|
||||
try {
|
||||
LOG.info("indexerFile name: [%s]", INDEXER_FILE);
|
||||
indexerSpec = getTaskAsString(INDEXER_FILE);
|
||||
indexerSpec = indexerSpec.replaceAll("%%TOPIC%%", TOPIC_NAME);
|
||||
indexerSpec = indexerSpec.replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts());
|
||||
indexerSpec = indexerSpec.replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis()));
|
||||
indexerSpec = indexerSpec.replaceAll(
|
||||
"%%SHUTOFFTIME%%",
|
||||
new DateTime(
|
||||
System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(
|
||||
2
|
||||
* MINUTES_TO_SEND
|
||||
)
|
||||
).toString()
|
||||
);
|
||||
LOG.info("indexerFile: [%s]\n", indexerSpec);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// log here so the message will appear in the console output
|
||||
LOG.error("could not read indexer file [%s]", INDEXER_FILE);
|
||||
throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE);
|
||||
}
|
||||
|
||||
// start indexing task
|
||||
taskID = indexer.submitTask(indexerSpec);
|
||||
LOG.info("-------------SUBMITTED TASK");
|
||||
|
||||
// set up kafka producer
|
||||
Properties properties = new Properties();
|
||||
properties.put("metadata.broker.list", config.getKafkaHost());
|
||||
LOG.info("kafka host: [%s]", config.getKafkaHost());
|
||||
properties.put("serializer.class", "kafka.serializer.StringEncoder");
|
||||
properties.put("request.required.acks", "1");
|
||||
properties.put("producer.type", "async");
|
||||
ProducerConfig producerConfig = new ProducerConfig(properties);
|
||||
Producer<String, String> producer = new Producer<String, String>(producerConfig);
|
||||
|
||||
DateTimeZone zone = DateTimeZone.forID("UTC");
|
||||
// format for putting into events
|
||||
DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
|
||||
|
||||
DateTime dt = new DateTime(zone); // timestamp to put on events
|
||||
dtFirst = dt; // timestamp of 1st event
|
||||
dtLast = dt; // timestamp of last event
|
||||
// stop sending events when time passes this
|
||||
DateTime dtStop = dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30);
|
||||
|
||||
// these are used to compute the expected aggregations
|
||||
int added = 0;
|
||||
int num_events = 0;
|
||||
|
||||
// send data to kafka
|
||||
while (dt.compareTo(dtStop) < 0) { // as long as we're within the time span
|
||||
LOG.info("sending event at [%s]", event_fmt.print(dt));
|
||||
num_events++;
|
||||
added += num_events;
|
||||
// construct the event to send
|
||||
String event = String.format(
|
||||
event_template,
|
||||
event_fmt.print(dt), num_events, 0, num_events
|
||||
);
|
||||
LOG.debug("event: [%s]", event);
|
||||
try {
|
||||
// Send event to kafka
|
||||
KeyedMessage<String, String> message = new KeyedMessage<String, String>(TOPIC_NAME, event);
|
||||
producer.send(message);
|
||||
}
|
||||
catch (Exception ioe) {
|
||||
Throwables.propagate(ioe);
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000);
|
||||
}
|
||||
catch (InterruptedException ex) { /* nothing */ }
|
||||
dtLast = dt;
|
||||
dt = new DateTime(zone);
|
||||
}
|
||||
|
||||
producer.close();
|
||||
|
||||
// put the timestamps into the query structure
|
||||
String query_response_template = null;
|
||||
InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE);
|
||||
if (null == is) {
|
||||
throw new ISE("could not open query file: %s", QUERIES_FILE);
|
||||
}
|
||||
|
||||
try {
|
||||
query_response_template = IOUtils.toString(is, "UTF-8");
|
||||
} catch (IOException e) {
|
||||
throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
|
||||
}
|
||||
|
||||
String queryStr = query_response_template
|
||||
// time boundary
|
||||
.replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
|
||||
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
|
||||
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
|
||||
// time series
|
||||
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
|
||||
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2)))
|
||||
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
|
||||
.replace("%%TIMESERIES_ADDED%%", Integer.toString(added))
|
||||
.replace("%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
|
||||
|
||||
// this query will probably be answered from the realtime task
|
||||
try {
|
||||
this.queryHelper.testQueriesFromString(queryStr, 2);
|
||||
} catch (Exception e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
|
||||
// wait for segments to be handed off
|
||||
try {
|
||||
RetryUtil.retryUntil(
|
||||
new Callable<Boolean>()
|
||||
{
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
return coordinator.areSegmentsLoaded(DATASOURCE);
|
||||
}
|
||||
},
|
||||
true,
|
||||
30000,
|
||||
10,
|
||||
"Real-time generated segments loaded"
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
LOG.info("segments are present");
|
||||
segmentsExist = true;
|
||||
|
||||
// this query will be answered by historical
|
||||
try {
|
||||
this.queryHelper.testQueriesFromString(queryStr, 2);
|
||||
}
|
||||
catch (Exception e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void afterClass()
|
||||
{
|
||||
LOG.info("teardown");
|
||||
|
||||
// wait for the task to complete
|
||||
indexer.waitUntilTaskCompletes(taskID);
|
||||
|
||||
// delete kafka topic
|
||||
AdminUtils.deleteTopic(zkClient, TOPIC_NAME);
|
||||
|
||||
// remove segments
|
||||
if (segmentsExist) {
|
||||
try {
|
||||
String first = DateTimeFormat.forPattern("yyyy-MM-dd'T00:00:00.000Z'").print(dtFirst);
|
||||
String last = DateTimeFormat.forPattern("yyyy-MM-dd'T00:00:00.000Z'").print(dtFirst.plusDays(1));
|
||||
unloadAndKillData(DATASOURCE, first, last);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.warn("exception while removing segments: [%s]", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
[
|
||||
{
|
||||
"description": "timeBoundary",
|
||||
"query": {
|
||||
"queryType":"timeBoundary",
|
||||
"dataSource":"kafka_test"
|
||||
},
|
||||
"expectedResults":[
|
||||
{
|
||||
"timestamp":"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
|
||||
"result": {
|
||||
"maxTime" : "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
|
||||
"minTime":"%%TIMEBOUNDARY_RESPONSE_MINTIME%%"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"description": "timeseries",
|
||||
"query": {
|
||||
"queryType": "timeseries",
|
||||
"dataSource": "kafka_test",
|
||||
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
|
||||
"granularity": "all",
|
||||
"aggregations": [
|
||||
{"type": "longSum", "fieldName": "count", "name": "edit_count"},
|
||||
{"type": "longSum", "fieldName": "added", "name": "chars_added"}
|
||||
]
|
||||
},
|
||||
"expectedResults": [
|
||||
{
|
||||
"timestamp" : "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
|
||||
"result" : {
|
||||
"chars_added" : %%TIMESERIES_ADDED%%,
|
||||
"edit_count" : %%TIMESERIES_NUMEVENTS%%
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
|
@ -0,0 +1,77 @@
|
|||
{
|
||||
"type" : "index_realtime",
|
||||
"spec" : {
|
||||
"dataSchema": {
|
||||
"dataSource": "kafka_test",
|
||||
"parser" : {
|
||||
"type" : "string",
|
||||
"parseSpec" : {
|
||||
"format" : "json",
|
||||
"timestampSpec" : {
|
||||
"column" : "timestamp",
|
||||
"format" : "auto"
|
||||
},
|
||||
"dimensionsSpec" : {
|
||||
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
|
||||
"dimensionExclusions" : [],
|
||||
"spatialDimensions" : []
|
||||
}
|
||||
}
|
||||
},
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "added",
|
||||
"fieldName": "added"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "deleted",
|
||||
"fieldName": "deleted"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "delta",
|
||||
"fieldName": "delta"
|
||||
}
|
||||
],
|
||||
"granularitySpec": {
|
||||
"type" : "uniform",
|
||||
"segmentGranularity": "MINUTE",
|
||||
"queryGranularity": "NONE"
|
||||
}
|
||||
},
|
||||
"ioConfig" : {
|
||||
"type" : "realtime",
|
||||
"firehose": {
|
||||
"type": "timed",
|
||||
"shutoffTime": "%%SHUTOFFTIME%%",
|
||||
"delegate": {
|
||||
"type": "kafka-0.8",
|
||||
"consumerProps": {
|
||||
"zookeeper.connect": "%%ZOOKEEPER_SERVER%%",
|
||||
"zookeeper.connection.timeout.ms" : "15000",
|
||||
"zookeeper.session.timeout.ms" : "15000",
|
||||
"zookeeper.sync.time.ms" : "5000",
|
||||
"group.id": "%%GROUP_ID%%",
|
||||
"fetch.message.max.bytes" : "1048586",
|
||||
"auto.offset.reset": "smallest",
|
||||
"auto.commit.enable": "false"
|
||||
},
|
||||
"feed": "%%TOPIC%%"
|
||||
}
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type" : "realtime",
|
||||
"maxRowsInMemory": 500000,
|
||||
"intermediatePersistPeriod": "PT3M",
|
||||
"windowPeriod": "PT1M",
|
||||
"basePersistDirectory": "/home/y/var/druid_state/kafka_test/realtime/basePersist"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -373,7 +373,7 @@
|
|||
"context": {
|
||||
"useCache": "true",
|
||||
"populateCache": "true",
|
||||
"timeout": 180000
|
||||
"timeout": 360000
|
||||
}
|
||||
},
|
||||
"expectedResults": [
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper druid-metadata-storage;
|
||||
for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage;
|
||||
do
|
||||
docker stop $node
|
||||
docker rm $node
|
||||
|
|
Loading…
Reference in New Issue