Merge pull request #1859 from rasahner/baseOnCommunity

some changes for support of our integration tests
This commit is contained in:
Charles Allen 2015-10-26 14:19:26 -07:00
commit c523733c3f
22 changed files with 587 additions and 24 deletions

View File

@ -22,6 +22,13 @@ RUN wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeep
RUN cp /usr/local/zookeeper-3.4.6/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.6/conf/zoo.cfg
RUN ln -s /usr/local/zookeeper-3.4.6 /usr/local/zookeeper
# Kafka
RUN wget -q -O - http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz | tar -xzf - -C /usr/local
RUN ln -s /usr/local/kafka_2.10-0.8.2.0 /usr/local/kafka
# unless advertised.host.name is set to docker ip, publishing data fails
ADD docker_ip docker_ip
RUN perl -pi -e "s/#advertised.port=.*/advertised.port=9092/; s/#advertised.host.*/advertised.host.name=$(cat docker_ip)/" /usr/local/kafka/config/server.properties
# git
RUN apt-get install -y git

View File

@ -11,7 +11,7 @@ command=java
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.zk.service.host=druid-zookeeper
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.processing.buffer.sizeBytes=75000000
-Ddruid.server.http.numThreads=100
-Ddruid.processing.numThreads=1

View File

@ -13,7 +13,7 @@ command=java
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.zk.service.host=druid-zookeeper
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.coordinator.startDelay=PT5S
-cp /usr/local/druid/lib/*
io.druid.cli.Main server coordinator

View File

@ -11,7 +11,7 @@ command=java
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.zk.service.host=druid-zookeeper
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
-Ddruid.s3.secretKey=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
-Ddruid.processing.buffer.sizeBytes=75000000

View File

@ -0,0 +1,5 @@
[program:kafka]
command=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
user=daemon
priority=0
stdout_logfile=/shared/logs/kafka.log

View File

@ -9,7 +9,7 @@ command=java
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.zk.service.host=druid-zookeeper
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.indexer.logs.directory=/shared/tasklogs
-Ddruid.storage.storageDirectory=/shared/storage
-Ddruid.indexer.runner.javaOpts=-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

View File

@ -13,7 +13,7 @@ command=java
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.zk.service.host=druid-zookeeper
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.indexer.storage.type=metadata
-Ddruid.indexer.logs.directory=/shared/tasklogs
-Ddruid.indexer.runner.type=remote

View File

@ -8,7 +8,7 @@ command=java
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.zk.service.host=druid-zookeeper
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.computation.buffer.size=75000000
-Ddruid.server.http.numThreads=100
-Ddruid.processing.numThreads=1

View File

@ -29,6 +29,11 @@
<version>0.9.0-SNAPSHOT</version>
</parent>
<properties>
<apache.kafka.version>0.8.2.1</apache.kafka.version>
<zkclient.version>0.4</zkclient.version>
</properties>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
@ -40,6 +45,11 @@
<artifactId>druid-s3-extensions</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-kafka-eight</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-histogram</artifactId>
@ -71,6 +81,29 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${apache.kafka.version}</version>
<!-- without this exclusion, there's interference that affects the log level -->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>

View File

@ -1,5 +1,5 @@
# cleanup
for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper druid-metadata-storage;
for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage;
do
docker stop $node
docker rm $node
@ -12,6 +12,9 @@ SHARED_DIR=${HOME}/shared
SUPERVISORDIR=/usr/lib/druid/conf
RESOURCEDIR=$DIR/src/test/resources
# so docker IP addr will be known during docker build
echo $DOCKER_IP > $DOCKERDIR/docker_ip
# Make directories if they dont exist
mkdir -p $SHARED_DIR/logs
mkdir -p $SHARED_DIR/tasklogs
@ -24,26 +27,26 @@ mvn dependency:copy-dependencies -DoutputDirectory=$SHARED_DIR/docker/lib
# Build Druid Cluster Image
docker build -t druid/cluster $SHARED_DIR/docker
# Start zookeeper
docker run -d --name druid-zookeeper -p 2181:2181 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf druid/cluster
# Start zookeeper and kafka
docker run -d --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster
# Start MYSQL
docker run -d --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster
# Start Overlord
docker run -d --name druid-overlord -p 8090:8090 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper:druid-zookeeper druid/cluster
docker run -d --name druid-overlord -p 8090:8090 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
# Start Coordinator
docker run -d --name druid-coordinator -p 8081:8081 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper:druid-zookeeper druid/cluster
docker run -d --name druid-coordinator -p 8081:8081 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
# Start Historical
docker run -d --name druid-historical -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper:druid-zookeeper druid/cluster
docker run -d --name druid-historical -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
# Start Middlemanger
docker run -d --name druid-middlemanager -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper:druid-zookeeper --link druid-overlord:druid-overlord druid/cluster
docker run -d --name druid-middlemanager -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster
# Start Broker
docker run -d --name druid-broker -p 8082:8082 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper:druid-zookeeper --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
docker run -d --name druid-broker -p 8082:8082 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
# Start Router
docker run -d --name druid-router -p 8888:8888 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper:druid-zookeeper --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
docker run -d --name druid-router -p 8888:8888 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster

View File

@ -42,6 +42,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
private String indexerHost = "";
private String middleManagerHost = "";
private String zookeeperHosts = ""; // comma-separated list of host:port
private String kafkaHost = "";
private Map<String, String> props = null;
@JsonCreator
@ -62,20 +63,26 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
catch (IOException ex) {
throw new RuntimeException(ex);
}
routerHost = props.get("router_host") + ":" + props.get("router_port");
// there might not be a router; we want routerHost to be null in that case
routerHost = props.get("router_host");
if (null != routerHost) {
routerHost += ":" + props.get("router_port");
}
brokerHost = props.get("broker_host") + ":" + props.get("broker_port");
historicalHost = props.get("historical_host") + ":" + props.get("historical_port");
coordinatorHost = props.get("coordinator_host") + ":" + props.get("coordinator_port");
indexerHost = props.get("indexer_host") + ":" + props.get("indexer_port");
middleManagerHost = props.get("middlemanager_host");
zookeeperHosts = props.get("zookeeper_hosts");
kafkaHost = props.get("kafka_host") + ":" + props.get ("kafka_port");
LOG.info ("router: [%s]", routerHost);
LOG.info ("broker [%s]: ", brokerHost);
LOG.info ("broker: [%s]", brokerHost);
LOG.info ("coordinator: [%s]", coordinatorHost);
LOG.info ("overlord: [%s]", indexerHost);
LOG.info ("middle manager: [%s]", middleManagerHost);
LOG.info ("zookeepers: [%s]", zookeeperHosts);
LOG.info ("kafka: [%s]", kafkaHost);
}
@Override
@ -125,6 +132,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
return zookeeperHosts;
}
@Override
public String getKafkaHost()
{
return kafkaHost;
}
@Override
public String getProperty(String keyword)
{

View File

@ -76,6 +76,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
return dockerIp + ":2181";
}
@Override
public String getKafkaHost()
{
return dockerIp + ":9092";
}
@Override
public String getProperty(String prop)
{

View File

@ -35,5 +35,7 @@ public interface IntegrationTestingConfig
public String getZookeeperHosts();
public String getKafkaHost();
public String getProperty(String prop);
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.testing;
import com.google.inject.Inject;
import io.druid.curator.CuratorConfig;
/**
* We will use this instead of druid server's CuratorConfig, because CuratorConfig in
* a test cluster environment sees zookeeper at localhost even if zookeeper is elsewhere.
* We'll take the zookeeper host from the configuration file instead.
*/
public class IntegrationTestingCuratorConfig extends CuratorConfig
{
private IntegrationTestingConfig config;
@Inject
public IntegrationTestingCuratorConfig (IntegrationTestingConfig config)
{
this.config = config;
}
@Override
public String getZkHosts()
{
return config.getZookeeperHosts();
}
}

View File

@ -115,7 +115,7 @@ public class OverlordResourceTestClient
public TaskStatus.Status getTaskStatus(String taskID)
{
try {
StatusResponseHolder response = makeRequest(
StatusResponseHolder response = makeRequest( HttpMethod.GET,
String.format(
"%stask/%s/status",
getIndexerURL(),
@ -156,7 +156,7 @@ public class OverlordResourceTestClient
private List<TaskResponseObject> getTasks(String identifier)
{
try {
StatusResponseHolder response = makeRequest(
StatusResponseHolder response = makeRequest( HttpMethod.GET,
String.format("%s%s", getIndexerURL(), identifier)
);
LOG.info("Tasks %s response %s", identifier, response.getContent());
@ -171,6 +171,26 @@ public class OverlordResourceTestClient
}
}
public Map<String, String> shutDownTask(String taskID)
{
try {
StatusResponseHolder response = makeRequest( HttpMethod.POST,
String.format("%stask/%s/shutdown", getIndexerURL(),
URLEncoder.encode(taskID, "UTF-8")
)
);
LOG.info("Shutdown Task %s response %s", taskID, response.getContent());
return jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, String>>()
{
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void waitUntilTaskCompletes(final String taskID)
{
RetryUtil.retryUntil(
@ -193,11 +213,11 @@ public class OverlordResourceTestClient
);
}
private StatusResponseHolder makeRequest(String url)
private StatusResponseHolder makeRequest(HttpMethod method, String url)
{
try {
StatusResponseHolder response = this.httpClient
.go(new Request(HttpMethod.GET, new URL(url)), responseHandler).get();
.go(new Request(method, new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE("Error while making request to indexer [%s %s]", response.getStatus(), response.getContent());
}

View File

@ -30,6 +30,8 @@ import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle;
import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.IntegrationTestingConfigProvider;
import io.druid.curator.CuratorConfig;
import io.druid.testing.IntegrationTestingCuratorConfig;
/**
*/
@ -40,6 +42,8 @@ public class DruidTestModule implements Module
{
binder.bind(IntegrationTestingConfig.class).toProvider(IntegrationTestingConfigProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.test.config", IntegrationTestingConfigProvider.class);
binder.bind(CuratorConfig.class).to(IntegrationTestingCuratorConfig.class);
}
@Provides

View File

@ -90,7 +90,11 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory
;
waitUntilInstanceReady(client, config.getCoordinatorHost());
waitUntilInstanceReady(client, config.getIndexerHost());
waitUntilInstanceReady(client, config.getBrokerHost());
String routerHost = config.getRouterHost();
if (null != routerHost) {
waitUntilInstanceReady(client, config.getRouterHost());
}
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
try {
lifecycle.start();

View File

@ -0,0 +1,304 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.tests.indexer;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.RetryUtil;
import io.druid.testing.utils.TestQueryHelper;
import kafka.admin.AdminUtils;
import kafka.common.TopicExistsException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import org.apache.commons.io.IOUtils;
import java.io.InputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Callable;
/*
* This is a test for the kafka firehose.
*/
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITKafkaTest.class);
private static final int DELAY_BETWEEN_EVENTS_SECS = 5;
private static final String INDEXER_FILE = "/indexer/kafka_index_task.json";
private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
private static final String DATASOURCE = "kafka_test";
private static final String TOPIC_NAME = "kafkaTopic";
private static final int MINUTES_TO_SEND = 2;
// We'll fill in the current time and numbers for added, deleted and changed
// before sending the event.
final String event_template =
"{\"timestamp\": \"%s\"," +
"\"page\": \"Gypsy Danger\"," +
"\"language\" : \"en\"," +
"\"user\" : \"nuclear\"," +
"\"unpatrolled\" : \"true\"," +
"\"newPage\" : \"true\"," +
"\"robot\": \"false\"," +
"\"anonymous\": \"false\"," +
"\"namespace\":\"article\"," +
"\"continent\":\"North America\"," +
"\"country\":\"United States\"," +
"\"region\":\"Bay Area\"," +
"\"city\":\"San Francisco\"," +
"\"added\":%d," +
"\"deleted\":%d," +
"\"delta\":%d}";
private String taskID;
private ZkClient zkClient;
private Boolean segmentsExist; // to tell if we should remove segments during teardown
// format for the querying interval
private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
// format for the expected timestamp in a query response
private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
private DateTime dtFirst; // timestamp of 1st event
private DateTime dtLast; // timestamp of last event
@Inject
private TestQueryHelper queryHelper;
@Inject
private IntegrationTestingConfig config;
@Test
public void testKafka()
{
LOG.info("Starting test: ITKafkaTest");
// create topic
try {
int sessionTimeoutMs = 10000;
int connectionTimeoutMs = 10000;
String zkHosts = config.getZookeeperHosts();
zkClient = new ZkClient(
zkHosts, sessionTimeoutMs, connectionTimeoutMs,
ZKStringSerializer$.MODULE$
);
int numPartitions = 1;
int replicationFactor = 1;
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkClient, TOPIC_NAME, numPartitions, replicationFactor, topicConfig);
}
catch (TopicExistsException e) {
// it's ok if the topic already exists
}
catch (Exception e) {
throw new ISE(e, "could not create kafka topic");
}
String indexerSpec = "";
// replace temp strings in indexer file
try {
LOG.info("indexerFile name: [%s]", INDEXER_FILE);
indexerSpec = getTaskAsString(INDEXER_FILE);
indexerSpec = indexerSpec.replaceAll("%%TOPIC%%", TOPIC_NAME);
indexerSpec = indexerSpec.replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts());
indexerSpec = indexerSpec.replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis()));
indexerSpec = indexerSpec.replaceAll(
"%%SHUTOFFTIME%%",
new DateTime(
System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(
2
* MINUTES_TO_SEND
)
).toString()
);
LOG.info("indexerFile: [%s]\n", indexerSpec);
}
catch (Exception e) {
// log here so the message will appear in the console output
LOG.error("could not read indexer file [%s]", INDEXER_FILE);
throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE);
}
// start indexing task
taskID = indexer.submitTask(indexerSpec);
LOG.info("-------------SUBMITTED TASK");
// set up kafka producer
Properties properties = new Properties();
properties.put("metadata.broker.list", config.getKafkaHost());
LOG.info("kafka host: [%s]", config.getKafkaHost());
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks", "1");
properties.put("producer.type", "async");
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer<String, String> producer = new Producer<String, String>(producerConfig);
DateTimeZone zone = DateTimeZone.forID("UTC");
// format for putting into events
DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
DateTime dt = new DateTime(zone); // timestamp to put on events
dtFirst = dt; // timestamp of 1st event
dtLast = dt; // timestamp of last event
// stop sending events when time passes this
DateTime dtStop = dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30);
// these are used to compute the expected aggregations
int added = 0;
int num_events = 0;
// send data to kafka
while (dt.compareTo(dtStop) < 0) { // as long as we're within the time span
LOG.info("sending event at [%s]", event_fmt.print(dt));
num_events++;
added += num_events;
// construct the event to send
String event = String.format(
event_template,
event_fmt.print(dt), num_events, 0, num_events
);
LOG.debug("event: [%s]", event);
try {
// Send event to kafka
KeyedMessage<String, String> message = new KeyedMessage<String, String>(TOPIC_NAME, event);
producer.send(message);
}
catch (Exception ioe) {
Throwables.propagate(ioe);
}
try {
Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000);
}
catch (InterruptedException ex) { /* nothing */ }
dtLast = dt;
dt = new DateTime(zone);
}
producer.close();
// put the timestamps into the query structure
String query_response_template = null;
InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE);
if (null == is) {
throw new ISE("could not open query file: %s", QUERIES_FILE);
}
try {
query_response_template = IOUtils.toString(is, "UTF-8");
} catch (IOException e) {
throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
}
String queryStr = query_response_template
// time boundary
.replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
// time series
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2)))
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMESERIES_ADDED%%", Integer.toString(added))
.replace("%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
// this query will probably be answered from the realtime task
try {
this.queryHelper.testQueriesFromString(queryStr, 2);
} catch (Exception e) {
Throwables.propagate(e);
}
// wait for segments to be handed off
try {
RetryUtil.retryUntil(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return coordinator.areSegmentsLoaded(DATASOURCE);
}
},
true,
30000,
10,
"Real-time generated segments loaded"
);
}
catch (Exception e) {
Throwables.propagate(e);
}
LOG.info("segments are present");
segmentsExist = true;
// this query will be answered by historical
try {
this.queryHelper.testQueriesFromString(queryStr, 2);
}
catch (Exception e) {
Throwables.propagate(e);
}
}
@AfterClass
public void afterClass()
{
LOG.info("teardown");
// wait for the task to complete
indexer.waitUntilTaskCompletes(taskID);
// delete kafka topic
AdminUtils.deleteTopic(zkClient, TOPIC_NAME);
// remove segments
if (segmentsExist) {
try {
String first = DateTimeFormat.forPattern("yyyy-MM-dd'T00:00:00.000Z'").print(dtFirst);
String last = DateTimeFormat.forPattern("yyyy-MM-dd'T00:00:00.000Z'").print(dtFirst.plusDays(1));
unloadAndKillData(DATASOURCE, first, last);
}
catch (Exception e) {
LOG.warn("exception while removing segments: [%s]", e.getMessage());
}
}
}
}

View File

@ -0,0 +1,40 @@
[
{
"description": "timeBoundary",
"query": {
"queryType":"timeBoundary",
"dataSource":"kafka_test"
},
"expectedResults":[
{
"timestamp":"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
"result": {
"maxTime" : "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
"minTime":"%%TIMEBOUNDARY_RESPONSE_MINTIME%%"
}
}
]
},
{
"description": "timeseries",
"query": {
"queryType": "timeseries",
"dataSource": "kafka_test",
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
"granularity": "all",
"aggregations": [
{"type": "longSum", "fieldName": "count", "name": "edit_count"},
{"type": "longSum", "fieldName": "added", "name": "chars_added"}
]
},
"expectedResults": [
{
"timestamp" : "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
"result" : {
"chars_added" : %%TIMESERIES_ADDED%%,
"edit_count" : %%TIMESERIES_NUMEVENTS%%
}
}
]
}
]

View File

@ -0,0 +1,77 @@
{
"type" : "index_realtime",
"spec" : {
"dataSchema": {
"dataSource": "kafka_test",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type" : "uniform",
"segmentGranularity": "MINUTE",
"queryGranularity": "NONE"
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "timed",
"shutoffTime": "%%SHUTOFFTIME%%",
"delegate": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "%%ZOOKEEPER_SERVER%%",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "%%GROUP_ID%%",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "smallest",
"auto.commit.enable": "false"
},
"feed": "%%TOPIC%%"
}
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT3M",
"windowPeriod": "PT1M",
"basePersistDirectory": "/home/y/var/druid_state/kafka_test/realtime/basePersist"
}
}
}

View File

@ -373,7 +373,7 @@
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 180000
"timeout": 360000
}
},
"expectedResults": [

View File

@ -1,4 +1,4 @@
for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper druid-metadata-storage;
for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage;
do
docker stop $node
docker rm $node