Re-import of test-harness.

This commit is contained in:
Russell Jurney 2013-06-26 16:53:03 -07:00
parent 511ee6c4cb
commit 8e7d991a09
93 changed files with 1588 additions and 0 deletions

3
test-harness/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.DS_Store
logs/
*generated.spec

View File

@ -0,0 +1,9 @@
#!/bin/bash
(
source env-cluster.sh
sed s/clientPort.*/clientPort=$ZOOKEEPER_PORT/ kafka/config/zookeeper.properties > /tmp/zookeeper.properties
mkdir -p logs
cd kafka
bin/zookeeper-server-start.sh /tmp/zookeeper.properties >> ../logs/zookeeper.log 2>&1 &
)

8
test-harness/02-start-kafka.sh Executable file
View File

@ -0,0 +1,8 @@
#!/bin/bash
(
source env-cluster.sh
sed s/zk\.connect.*/zk.connect=$ZK_CONNECT_STRING/ kafka/config/server.properties > /tmp/kafka-server.properties
mkdir -p logs
cd kafka
bin/kafka-server-start.sh /tmp/kafka-server.properties >> ../logs/kafka.log 2>&1 &
)

View File

@ -0,0 +1,11 @@
#!/bin/bash
(
source env-cluster.sh
mkdir -p logs
# bug in druid...it doesn't create this node
echo -e "create /druid foo\ncreate /druid/announcements foo" | kafka/bin/zookeeper-shell.sh "$ZK_CONNECT_STRING"
echo "Ignore the 'already exists' errors"
cd druid
bin/start-master.sh >> ../logs/druid-master.log 2>&1 &
)

View File

@ -0,0 +1,5 @@
#!/bin/bash
(
cd druid
bin/start-realtime.sh >> ../logs/druid-realtime.log 2>&1 &
)

View File

@ -0,0 +1,5 @@
#!/bin/bash
(
cd druid
bin/start-compute.sh >> ../logs/druid-compute.log 2>&1 &
)

View File

@ -0,0 +1,5 @@
#!/bin/bash
(
cd druid
bin/start-broker.sh >> ../logs/druid-broker.log 2>&1 &
)

View File

@ -0,0 +1,8 @@
#!/bin/bash
mkdir -p logs
source env-cluster.sh
(
cd firehose
bin/start-firehose.sh -rate $FIREHOSE_RATE_PER_SEC -zk "$ZK_CONNECT_STRING" -out kafka -topic $KAFKA_TOPIC >> ../logs/firehose.log 2>&1 &
)

12
test-harness/08-run-queries.sh Executable file
View File

@ -0,0 +1,12 @@
#!/bin/bash
(
mkdir -p logs
cd queries
bin/run_client.sh event_counts >> ../logs/queries.log 2>&1 &
)
echo "Queries running...results going to logs/queries.log"
if [ "$1" == "f" ]; then
tail -f logs/queries.log
else
echo "To automatically follow, pass 'f' as the first argument: './05_run_queries.sh f'"
fi

2
test-harness/README Normal file
View File

@ -0,0 +1,2 @@
See the instructions on the Wiki:
https://github.com/housejester/druid-test-harness/wiki/Druid-Test-Harness

View File

@ -0,0 +1,27 @@
[{
"schema" : { "dataSource":"appevents",
"aggregators":[ {"type":"count", "name":"events"}
],
"indexGranularity":"minute"
},
"config" : { "maxRowsInMemory" : 500000,
"intermediatePersistPeriod" : "PT1m" },
"firehose" : { "type" : "kafka-0.6.3",
"consumerProps" : { "zk.connect" : "localhost:2181",
"zk.connectiontimeout.ms" : "15000",
"zk.sessiontimeout.ms" : "15000",
"zk.synctime.ms" : "5000",
"groupid" : "consumer-group",
"fetch.size" : "14856",
"autooffset.reset" : "largest",
"autocommit.enable" : "false" },
"feed" : "campaigns_01",
"parser" : { "timestampSpec" : { "column" : "event_timestamp", "format" : "millis" },
"data" : { "format" : "json" }
} },
"plumber" : { "type" : "realtime",
"windowPeriod" : "PT1m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/eval2BasePersist" }
}]

View File

@ -0,0 +1,22 @@
#!/bin/bash
ENV_CLUSTER="`dirname $0`/../../env-cluster.sh"
source $ENV_CLUSTER
NODE_TYPE=broker
MAIN_CLASS=com.metamx.druid.http.BrokerMain
DRUID_HOST="${DRUID_BROKER_HOST}:${DRUID_BROKER_PORT}"
DRUID_PORT=$DRUID_BROKER_PORT
mkdir -p /tmp/druid-$NODE_TYPE
CONFIG="`dirname $0`/../config"
RT_CONFIG="/tmp/druid-${NODE_TYPE}/runtime.properties"
cat $CONFIG/base.properties $CONFIG/${NODE_TYPE}.properties |
sed s/druid\.zk\.service\.host\=.*/druid.zk.service.host=$ZK_CONNECT_STRING/g |
sed s/com\.metamx\.aws\.accessKey\=.*/com.metamx.aws.accessKey=$AWS_KEY/g |
sed s/com\.metamx\.aws\.secretKey\=.*/com.metamx.aws.secretKey=$AWS_SECRET/g |
sed s/druid\.pusher\.s3\.bucket\=.*/druid.pusher.s3.bucket=$S3_BUCKET/g |
sed s/druid\.pusher\.s3\.baseKey\=.*/druid.pusher.s3.baseKey=$S3_BASENAME/g |
sed s/druid\.host\=.*/druid.host=$DRUID_HOST/g |
sed s/druid\.port\=.*/druid.port=$DRUID_PORT/g > $RT_CONFIG
java -cp /tmp/druid-${NODE_TYPE}:`ls *.jar | tr '\n' ':'` $MAIN_CLASS

View File

@ -0,0 +1,21 @@
#!/bin/bash
ENV_CLUSTER="`dirname $0`/../../env-cluster.sh"
source $ENV_CLUSTER
NODE_TYPE=compute
MAIN_CLASS=com.metamx.druid.http.ComputeMain
DRUID_HOST="${DRUID_COMPUTE_HOST}:${DRUID_COMPUTE_PORT}"
DRUID_PORT=$DRUID_COMPUTE_PORT
mkdir -p /tmp/druid-$NODE_TYPE
CONFIG="`dirname $0`/../config"
RT_CONFIG="/tmp/druid-${NODE_TYPE}/runtime.properties"
cat $CONFIG/base.properties $CONFIG/${NODE_TYPE}.properties |
sed s/druid\.zk\.service\.host\=.*/druid.zk.service.host=$ZK_CONNECT_STRING/g |
sed s/com\.metamx\.aws\.accessKey\=.*/com.metamx.aws.accessKey=$AWS_KEY/g |
sed s/com\.metamx\.aws\.secretKey\=.*/com.metamx.aws.secretKey=$AWS_SECRET/g |
sed s/druid\.pusher\.s3\.bucket\=.*/druid.pusher.s3.bucket=$S3_BUCKET/g |
sed s/druid\.pusher\.s3\.baseKey\=.*/druid.pusher.s3.baseKey=$S3_BASENAME/g |
sed s/druid\.host\=.*/druid.host=$DRUID_HOST/g |
sed s/druid\.port\=.*/druid.port=$DRUID_PORT/g > $RT_CONFIG
java -cp /tmp/druid-${NODE_TYPE}:`ls *.jar | tr '\n' ':'` $MAIN_CLASS

View File

@ -0,0 +1,21 @@
#!/bin/bash
ENV_CLUSTER="`dirname $0`/../../env-cluster.sh"
source $ENV_CLUSTER
NODE_TYPE=master
MAIN_CLASS=com.metamx.druid.http.MasterMain
DRUID_HOST="${DRUID_MASTER_HOST}:${DRUID_MASTER_PORT}"
DRUID_PORT=$DRUID_MASTER_PORT
mkdir -p /tmp/druid-$NODE_TYPE
CONFIG="`dirname $0`/../config"
RT_CONFIG="/tmp/druid-${NODE_TYPE}/runtime.properties"
cat $CONFIG/base.properties $CONFIG/${NODE_TYPE}.properties |
sed s/druid\.zk\.service\.host\=.*/druid.zk.service.host=$ZK_CONNECT_STRING/g |
sed s/com\.metamx\.aws\.accessKey\=.*/com.metamx.aws.accessKey=$AWS_KEY/g |
sed s/com\.metamx\.aws\.secretKey\=.*/com.metamx.aws.secretKey=$AWS_SECRET/g |
sed s/druid\.pusher\.s3\.bucket\=.*/druid.pusher.s3.bucket=$S3_BUCKET/g |
sed s/druid\.pusher\.s3\.baseKey\=.*/druid.pusher.s3.baseKey=$S3_BASENAME/g |
sed s/druid\.host\=.*/druid.host=$DRUID_HOST/g |
sed s/druid\.port\=.*/druid.port=$DRUID_PORT/g > $RT_CONFIG
java -cp /tmp/druid-${NODE_TYPE}:`ls *.jar | tr '\n' ':'` $MAIN_CLASS

View File

@ -0,0 +1,29 @@
#!/bin/bash
ENV_CLUSTER="`dirname $0`/../../env-cluster.sh"
source $ENV_CLUSTER
NODE_TYPE=realtime
MAIN_CLASS=com.metamx.druid.realtime.RealtimeMain
DRUID_HOST="${DRUID_REALTIME_HOST}:${DRUID_REALTIME_PORT}"
DRUID_PORT=$DRUID_REALTIME_PORT
mkdir -p /tmp/druid-$NODE_TYPE
CONFIG="`dirname $0`/../config"
RT_CONFIG="/tmp/druid-${NODE_TYPE}/runtime.properties"
SPEC="`dirname $0`/../appevents_realtime.spec"
RT_SPEC="`dirname $0`/../appevents_realtime_generated.spec"
cat $CONFIG/base.properties $CONFIG/${NODE_TYPE}.properties |
sed s/druid\.realtime\.specFile\=.*/druid.realtime.specFile=appevents_realtime_generated.spec/g |
sed s/druid\.zk\.service\.host\=.*/druid.zk.service.host=$ZK_CONNECT_STRING/g |
sed s/com\.metamx\.aws\.accessKey\=.*/com.metamx.aws.accessKey=$AWS_KEY/g |
sed s/com\.metamx\.aws\.secretKey\=.*/com.metamx.aws.secretKey=$AWS_SECRET/g |
sed s/druid\.pusher\.s3\.bucket\=.*/druid.pusher.s3.bucket=$S3_BUCKET/g |
sed s/druid\.pusher\.s3\.baseKey\=.*/druid.pusher.s3.baseKey=$S3_BASENAME/g |
sed s/druid\.host\=.*/druid.host=$DRUID_HOST/g |
sed s/druid\.port\=.*/druid.port=$DRUID_PORT/g > $RT_CONFIG
cat $SPEC |
sed s/\"zk\.connect\".*,/\"zk.connect\":\"$ZK_CONNECT_STRING\",/g |
sed s/\"feed\".*,/\"feed\":\"$KAFKA_TOPIC\",/g > $RT_SPEC
java -cp /tmp/druid-${NODE_TYPE}:`ls *.jar | tr '\n' ':'` $MAIN_CLASS

View File

@ -0,0 +1,155 @@
# Properties for demo of Realtime Node in standalone mode.
# To Use This: copy this file to runtime.properties and put directory containing it in classpath.
#
# S3 access
com.metamx.aws.accessKey="AKIAJZ7WZ72IRUQK5KCQ"
com.metamx.aws.secretKey="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"
# thread pool size for servicing queries
druid.client.http.connections=10
# JDBC connection string for metadata database
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
druid.database.user=druid
druid.database.password=diurd
# time between polling for metadata database
druid.database.poll.duration=PT1M
# table for segment metadata coordination, no default
druid.database.segmentTable=prod_segments
druid.database.ruleTable=prod_rules
#in progress 20121010 #druid.database.taskTable=
druid.emitter.period=PT60S
druid.master.host
# Poll period the master runs on
druid.master.period=PT60S
# Number of poll periods to wait for a node to come back before believing it is really gone
druid.master.removedSegmentLifetime=1
# Delay for the master to start its work, this should be sufficiently high so that the master can get all of the
# information it needs from ZK before starting. It's a hack, but it works until we re-work our ZK integration.
druid.master.startDelay=PT600S
# Path on local FS for storage of segments; dir. will be created if needed
druid.paths.indexCache=/tmp/druid_eval_compute_1a/indexCache
# Path on local FS for storage of segment metadata; dir. will be created if needed
druid.paths.segmentInfoCache=/tmp/druid_eval_compute_1a/segmentInfoCache
# Path to schema definition file
druid.request.logging.dir=/tmp/druid_eval_compute_1a/log
# TODO: have these moved to spec file?
# unknown # druid.realtime.dataSources=
# unknown # druid.realtime.index.maxSize=500000
# unknown # druid.realtime.persistPeriod=PT600S
# unknown # druid.realtime.scheduledExec.threads=1
# unknown # druid.realtime.uploadPeriod=PT3600S
# unknown # druid.realtime.windowPeriod=PT600S
#druid.server.maxSize=0
druid.server.maxSize=300000000000
# =realtime or =historical (default)
druid.server.type=historical
#
# zookeeper (zk) znode paths (zpaths)
#
# base znode which establishes a unique namespace for a Druid ensemble.
# Default is /druid if not set
# This can also be set via parameter baseZkPath of the DruidSetup commandline
# druid.zk.paths.base=
# If these zpath properties like druid.zk.paths.*Path are overridden, then all must be
# overridden together for upgrade safety reasons.
# The commandline utility DruidSetup, which is used to set up properties on zookeeper,
# will validate this. Also, these zpaths must start with / because they are not relative.
# ZK znode path for service discovery within the cluster.
# Default is value of druid.zk.paths.base + /announcements
# druid.zk.paths.announcementsPath=/druid/announcements
druid.zk.paths.discoveryPath=/druid/discovery
# Legacy znode path, must be set, but can be ignored
#druid.zk.paths.indexesPath=/druid/indexes
# Default is value of druid.zk.paths.base + /tasks
##druid.zk.paths.indexer.tasksPath=/druid/tasks
# Default is value of druid.zk.paths.base + /status
#druid.zk.paths.indexer.statusPath=/druid/status
# ZK path for load/drop protocol between Master/Compute
# Default is value of druid.zk.paths.base + /loadQueue
#druid.zk.paths.loadQueuePath=/druid/loadQueue
# ZK path for Master leadership election
# Default is value of druid.zk.paths.base + /master
#druid.zk.paths.masterPath=/druid/master
# ZK path for publishing served segments
# Default is value of druid.zk.paths.base + /servedSegments
#druid.zk.paths.servedSegmentsPath=/druid/servedSegments
# Default is value of druid.zk.paths.base + /leaderLatch
#druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatch
# ZK path for properties stored in zookeeper
# Default is value of druid.zk.paths.base + /properties
#druid.zk.paths.propertiesPath=/druid/properties
druid.host=127.0.0.1:8080
druid.port=8181
#
druid.http.numThreads=10
# default is 5 min. (300000)
#druid.http.maxIdleTimeMillis=300000
# unknown # com.metamx.service=compute
com.metamx.emitter.http=true
com.metamx.emitter.http.url=http://localhost:9996/events
com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info
com.metamx.metrics.emitter.period=PT60S
# ZK quorum IPs; ZK coordinates in the form host1:port1[,host2:port2[, ...]]
# if =none then do not contact zookeeper (only for RealtimeStandaloneMain examples)
druid.zk.service.host=localhost:2181
# msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks
druid.zk.service.connectionTimeout=1000000
druid.processing.formatString=processing_%s
druid.processing.numThreads=3
#
# other properties found
#
druid.computation.buffer.size=10000000
druid.merger.threads=1
druid.merger.runner=remote
druid.merger.whitelist.enabled=false
druid.merger.whitelist.datasources=
druid.merger.rowFlushBoundary=500000
druid.indexer.retry.minWaitMillis=10000
druid.indexer.retry.maxWaitMillis=60000
druid.indexer.retry.maxRetryCount=10
#emitting, opaque marker
druid.service=foo
# S3 dest for realtime indexer
druid.pusher.s3.bucket=com.wesimply.druid.james
druid.pusher.s3.baseKey=eval_rt_1_
druid.realtime.specFile=eval_2_realtime.spec
#
# Integration-Test Related
#
# is this for RAM? which process?
druid.bard.cache.sizeInBytes=40000000
#ignore#druid.bard.host=

View File

@ -0,0 +1,20 @@
# Broker-specific property overrides. Should be appended to the base.properties
# used for the stats emitter
druid.service=broker
# Path on local FS for storage of segments; dir. will be created if needed
druid.paths.indexCache=/tmp/druid_eval/broker/indexCache
# Path on local FS for storage of segment metadata; dir. will be created if needed
druid.paths.segmentInfoCache=/tmp/druid_eval/broker/segmentInfoCache
druid.request.logging.dir=/tmp/druid_eval/broker/log
# host:port of THIS service
druid.host=127.0.0.1:8080
# port this broker should listen on
druid.port=8080

View File

@ -0,0 +1,21 @@
# Compute-specific property overrides. Should be appended to the base.properties
# used for the stats emitter
druid.service=compute
# Path on local FS for storage of segments; dir. will be created if needed
druid.paths.indexCache=/tmp/druid_eval/compute/indexCache
# Path on local FS for storage of segment metadata; dir. will be created if needed
druid.paths.segmentInfoCache=/tmp/druid_eval/compute/segmentInfoCache
# Path to schema definition file
druid.request.logging.dir=/tmp/druid_eval/compute/log
# host:port of THIS service
druid.host=127.0.0.1:8083
# Port this compute node should listen on
druid.port=8083

View File

@ -0,0 +1,19 @@
# Master-specific property overrides. Should be appended to the base.properties
# used for the stats emitter
druid.service=master
# Path on local FS for storage of segments; dir. will be created if needed
druid.paths.indexCache=/tmp/druid_eval/master/indexCache
# Path on local FS for storage of segment metadata; dir. will be created if needed
druid.paths.segmentInfoCache=/tmp/druid_eval/master/segmentInfoCache
# Path to schema definition file
druid.request.logging.dir=/tmp/druid_eval/master/log
# host:port of THIS service
druid.host=127.0.0.1:8081
# Port this master should listen on
druid.port=8081

View File

@ -0,0 +1,23 @@
# Realtime-specific property overrides. Should be appended to the base.properties
# used for the stats emitter
druid.service=realtime
# Path on local FS for storage of segments; dir. will be created if needed
druid.paths.indexCache=/tmp/druid_eval/realtime/indexCache
# Path on local FS for storage of segment metadata; dir. will be created if needed
druid.paths.segmentInfoCache=/tmp/druid_eval/realtime/segmentInfoCache
# Path to schema definition file
druid.request.logging.dir=/tmp/druid_eval/realtime/log
# =realtime or =historical (default)
druid.server.type=realtime
# host:port of THIS service
druid.host=127.0.0.1:8082
# Port this realtime node should listen on
druid.port=8082

31
test-harness/env-cluster.sh Executable file
View File

@ -0,0 +1,31 @@
#!/bin/bash
# This file defines the hostnames/locations for the various services.
ZOOKEEPER_PORT=2181
ZK_CONNECT_STRING="localhost:2181"
DRUID_DB_URI="jdbc:mysql://localhost:3306/druid"
DRUID_DB_USER="druid"
DRUID_DB_PASS="diurd"
DRUID_BROKER_HOST="localhost"
DRUID_BROKER_PORT=8080
DRUID_MASTER_HOST=`hostname`
DRUID_MASTER_PORT=8081
DRUID_REALTIME_HOST=`hostname`
DRUID_REALTIME_PORT=8082
DRUID_COMPUTE_HOST=`hostname`
DRUID_COMPUTE_PORT=8083
AWS_KEY="ENTER_AWS_KEY_HERE"
AWS_SECRET="ENTER_AWS_SECRET_HERE"
S3_BUCKET="ENTER_S3_BUCKET"
S3_BASENAME="druid_"
KAFKA_TOPIC="campaigns_01"
FIREHOSE_RATE_PER_SEC=500

View File

@ -0,0 +1,20 @@
${name:"sample",type:"combine",endToken:"---",trim:true,separator:""}
{
"appid": ${type:"minmax", min:1, max:280},
"event": "${event_name}",
"event_timestamp": ${type:"time", format:"millis"},
"event_value": ${type:"minmax", min:1, max:100},
"country": "${country}"
}
---
${name:"country", type:"weighted",delimiter:","}
80:US,5:AR,3:BE,1:CA,1:CH,8:CN,5:DE
${name:"event_name",type:"weighted"}
20:like
100:clicked
20:deleted
30:login
33:search

View File

@ -0,0 +1,2 @@
#!/bin/bash
java -jar d8a-conjure-1.0-SNAPSHOT.jar -template appevents.txt $@

Binary file not shown.

View File

@ -0,0 +1,41 @@
#!/bin/bash
if [ $# -lt 1 ];
then
echo "USAGE: $0 dir"
exit 1
fi
base_dir=$(dirname $0)/../..
hadoop=${HADOOP_HOME}/bin/hadoop
echo "$hadoop fs -rmr $1"
$hadoop fs -rmr $1
echo "$hadoop fs -mkdir $1"
$hadoop fs -mkdir $1
# include kafka jars
for file in $base_dir/dist/*.jar;
do
echo "$hadoop fs -put $file $1/"
$hadoop fs -put $file $1/
done
for file in $base_dir/lib/*.jar;
do
echo "$hadoop fs -put $file $1/"
$hadoop fs -put $file $1/
done
local_dir=$(dirname $0)
# include hadoop-consumer jars
for file in $local_dir/lib/*.jar;
do
echo "$hadoop fs -put $file $1/"
$hadoop fs -put $file $1/
done

View File

@ -0,0 +1,40 @@
#!/bin/bash
if [ $# -lt 1 ];
then
echo "USAGE: $0 classname [opts]"
exit 1
fi
base_dir=$(dirname $0)/../..
# include kafka jars
for file in $base_dir/lib/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
local_dir=$(dirname $0)
# include hadoop-consumer jars
echo $CLASSPATH
CLASSPATH=dist:$CLASSPATH:${HADOOP_HOME}/conf
#if [ -z "$KAFKA_OPTS" ]; then
# KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote"
#fi
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
$JAVA $KAFKA_OPTS -cp $CLASSPATH $@

View File

@ -0,0 +1,19 @@
This directory contains examples of client code that uses kafka.
The default target for ant is kafka.examples.KafkaConsumerProducerDemo which sends and receives
messages from Kafka server.
In order to run demo from SBT:
1. Start Zookeeper and the Kafka server
2. ./sbt from top-level kafka directory
3. Switch to the kafka java examples project -> project Kafka Java Examples
4. execute run -> run
5. For unlimited producer-consumer run, select option 1
For simple consumer demo, select option 2
To run the demo using scripts:
1. Start Zookeeper and the Kafka server
2. For simple consumer demo, run bin/java-simple-consumer-demo.sh
3. For unlimited producer-consumer run, run bin/java-producer-consumer-demo.sh

View File

@ -0,0 +1,30 @@
#!/bin/bash
base_dir=$(dirname $0)/../..
base_dir=$(dirname $0)/../../..
for file in $base_dir/lib/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
echo $CLASSPATH
if [ -z "$KAFKA_PERF_OPTS" ]; then
KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=3333 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
fi
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
$JAVA $KAFKA_OPTS -cp $CLASSPATH kafka.examples.KafkaConsumerProducerDemo $@

View File

@ -0,0 +1,28 @@
#!/bin/bash
base_dir=$(dirname $0)/../../..
for file in $base_dir/lib/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
echo $CLASSPATH
if [ -z "$KAFKA_PERF_OPTS" ]; then
KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=3333 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
fi
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
$JAVA $KAFKA_OPTS -cp $CLASSPATH kafka.examples.SimpleConsumerDemo $@

View File

@ -0,0 +1,7 @@
log4j.rootLogger=INFO, stderr
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.target=System.err
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n

View File

@ -0,0 +1,5 @@
#!/bin/bash
base_dir=$(dirname $0)
export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
$base_dir/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@

View File

@ -0,0 +1,3 @@
#!/bin/bash
$(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance $@

View File

@ -0,0 +1,3 @@
#!/bin/bash
$(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerShell $@

View File

@ -0,0 +1,3 @@
#!/bin/bash
$(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@

View File

@ -0,0 +1,3 @@
#!/bin/bash
$(dirname $0)/kafka-run-class.sh kafka.tools.ProducerShell $@

View File

@ -0,0 +1,37 @@
#!/bin/bash
if [ $# -lt 1 ];
then
echo "USAGE: $0 classname [opts]"
exit 1
fi
base_dir=$(dirname $0)/..
for file in $base_dir/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/lib/*.jar;
do
if [ ${file##*/} != "sbt-launch.jar" ]; then
CLASSPATH=$CLASSPATH:$file
fi
done
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS="-Xmx512M -server -Dlog4j.configuration=file:$base_dir/config/log4j.properties"
fi
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@

View File

@ -0,0 +1,11 @@
#!/bin/bash
if [ $# -lt 1 ];
then
echo "USAGE: $0 server.properties [consumer.properties]"
exit 1
fi
export JMX_PORT="9999"
$(dirname $0)/kafka-run-class.sh kafka.Kafka $@

View File

@ -0,0 +1,2 @@
#!/bin/sh
ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}' | xargs kill -SIGINT

View File

@ -0,0 +1,3 @@
#!/bin/bash
$(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@

View File

@ -0,0 +1,3 @@
#!/bin/bash
$(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerShell $@

View File

@ -0,0 +1,26 @@
#!/bin/bash
base_dir=$(dirname $0)/..
for file in $base_dir/lib/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
echo $CLASSPATH
if [ -z "$KAFKA_PERF_OPTS" ]; then
KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=3333 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
fi
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
$JAVA $KAFKA_OPTS -cp $CLASSPATH kafka.perf.KafkaPerfSimulator $@

View File

@ -0,0 +1,57 @@
# the id of the broker
brokerid=0
# number of logical partitions on this broker
num.partitions=4
# the port the socket server runs on
port=9092
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
# the directory in which to store log files
log.dir=/tmp/kafka-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
# the maximum size of a log segment
log.file.size=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
# the minimum age of a log file to eligible for deletion
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=1000
#set the following properties to use zookeeper
# enable connecting to zookeeper
enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=2000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,8 @@
This test replicates messages from 3 kafka brokers to 2 other kafka brokers using the embedded consumer.
At the end, the messages produced at the source brokers should match that at the target brokers.
To run this test, do
bin/run-test.sh
The expected output is given in bin/expected.out. There is only 1 thing that's important.
1. The output should have a line "test passed".

View File

@ -0,0 +1,18 @@
start the servers ...
start producing messages ...
wait for consumer to finish consuming ...
[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool)
[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool)
[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool)
thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec
[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool)
[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
Total Num Messages: 400000 bytes: 79859641 in 22.93 secs
Messages/sec: 17444.3960
MB/sec: 3.3214
test passed
stopping the servers
bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

View File

@ -0,0 +1,80 @@
#!/bin/bash
num_messages=400000
message_size=400
base_dir=$(dirname $0)/../../../../
test_dir=$(dirname $0)/..
rm -rf /tmp/zookeeper_source
rm -rf /tmp/zookeeper_target
rm -rf /tmp/kafka-source1-logs
mkdir /tmp/kafka-source1-logs
mkdir /tmp/kafka-source1-logs/test01-0
touch /tmp/kafka-source1-logs/test01-0/00000000000000000000.kafka
rm -rf /tmp/kafka-source2-logs
mkdir /tmp/kafka-source2-logs
mkdir /tmp/kafka-source2-logs/test01-0
touch /tmp/kafka-source2-logs/test01-0/00000000000000000000.kafka
rm -rf /tmp/kafka-source3-logs
mkdir /tmp/kafka-source3-logs
mkdir /tmp/kafka-source3-logs/test01-0
touch /tmp/kafka-source3-logs/test01-0/00000000000000000000.kafka
rm -rf /tmp/kafka-target1-logs
rm -rf /tmp/kafka-target2-logs
echo "start the servers ..."
$base_dir/bin/zookeeper-server-start.sh $test_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
$base_dir/bin/zookeeper-server-start.sh $test_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
$base_dir/bin/kafka-run-class.sh kafka.Kafka $test_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
$base_dir/bin/kafka-run-class.sh kafka.Kafka $test_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
$base_dir/bin/kafka-run-class.sh kafka.Kafka $test_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
$base_dir/bin/kafka-run-class.sh kafka.Kafka $test_dir/config/server_target1.properties $test_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target1.log &
$base_dir/bin/kafka-run-class.sh kafka.Kafka $test_dir/config/server_target2.properties $test_dir/config/consumer.properties 2>&1 > $base_dir/kafka_target2.log &
sleep 4
echo "start producing messages ..."
$base_dir/bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval 400000 num_messages --async --delay-btw-batch-ms 10 &
echo "wait for consumer to finish consuming ..."
cur1_offset="-1"
cur2_offset="-1"
quit1=0
quit2=0
while [ $quit1 -eq 0 ] && [ $quit2 -eq 0 ]
do
sleep 2
target1_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
if [ $target1_size -eq $cur1_offset ]
then
quit1=1
fi
cur1_offset=$target1_size
target2_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
if [ $target2_size -eq $cur2_offset ]
then
quit2=1
fi
cur2_offset=$target2_size
done
sleep 2
source_part0_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
source_part1_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
source_part2_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
target_part0_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
target_part1_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
expected_size=`expr $source_part0_size + $source_part1_size + $source_part2_size`
actual_size=`expr $target_part0_size + $target_part1_size`
if [ $expected_size != $actual_size ]
then
echo "source size: $expected_size target size: $actual_size test failed!!! look at it!!!"
else
echo "test passed"
fi
echo "stopping the servers"
ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null
sleep 2
ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 2>&1 > /dev/null

View File

@ -0,0 +1,14 @@
# see kafka.consumer.ConsumerConfig for more details
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
#consumer group id
groupid=group1
embeddedconsumer.topics=test01:1

View File

@ -0,0 +1,64 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=1
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# number of logical partitions on this broker
num.partitions=1
# the port the socket server runs on
port=9092
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
# the directory in which to store log files
log.dir=/tmp/kafka-source1-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
# the maximum size of a log segment
log.file.size=10000000
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
# the minimum age of a log file to eligible for deletion
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
#set the following properties to use zookeeper
# enable connecting to zookeeper
enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,64 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=2
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# number of logical partitions on this broker
num.partitions=1
# the port the socket server runs on
port=9091
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
# the directory in which to store log files
log.dir=/tmp/kafka-source2-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
# the maximum size of a log segment
log.file.size=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
# the minimum age of a log file to eligible for deletion
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
#set the following properties to use zookeeper
# enable connecting to zookeeper
enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,64 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=3
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# number of logical partitions on this broker
num.partitions=1
# the port the socket server runs on
port=9090
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
# the directory in which to store log files
log.dir=/tmp/kafka-source3-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
# the maximum size of a log segment
log.file.size=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
# the minimum age of a log file to eligible for deletion
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
#set the following properties to use zookeeper
# enable connecting to zookeeper
enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,64 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=1
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# number of logical partitions on this broker
num.partitions=1
# the port the socket server runs on
port=9093
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
# the directory in which to store log files
log.dir=/tmp/kafka-target1-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
# the maximum size of a log segment
log.file.size=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
# the minimum age of a log file to eligible for deletion
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
#set the following properties to use zookeeper
# enable connecting to zookeeper
enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,64 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=2
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# number of logical partitions on this broker
num.partitions=1
# the port the socket server runs on
port=9094
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
# the directory in which to store log files
log.dir=/tmp/kafka-target2-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
# the maximum size of a log segment
log.file.size=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
# the minimum age of a log file to eligible for deletion
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
#set the following properties to use zookeeper
# enable connecting to zookeeper
enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2182
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,4 @@
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper_source
# the port at which the clients will connect
clientPort=2181

View File

@ -0,0 +1,4 @@
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper_target
# the port at which the clients will connect
clientPort=2182

View File

@ -0,0 +1,11 @@
start the servers ...
start producing messages ...
Total Num Messages: 10000000 bytes: 1994374785 in 106.076 secs
Messages/sec: 94272.0314
MB/sec: 17.9304
[2011-05-02 11:50:29,022] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
wait for consumer to finish consuming ...
test passed
bin/../../../bin/kafka-server-start.sh: line 11: 359 Terminated $(dirname $0)/kafka-run-class.sh kafka.Kafka $@
bin/../../../bin/zookeeper-server-start.sh: line 9: 357 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
bin/../../../bin/zookeeper-server-start.sh: line 9: 358 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

View File

@ -0,0 +1,9 @@
This test produces a large number of messages to a broker. It measures the throughput and tests
the amount of data received is expected.
To run this test, do
bin/run-test.sh
The expected output is given in expected.out. There are 2 things to pay attention to:
1. The output should have a line "test passed".
2. The throughput from the producer should be around 300,000 Messages/sec on a typical machine.

View File

@ -0,0 +1,32 @@
start the servers ...
start producing 2000000 messages ...
[2011-05-17 14:31:12,568] INFO Creating async producer for broker id = 0 at localhost:9092 (kafka.producer.ProducerPool)
thread 0: 100000 messages sent 3272786.7779 nMsg/sec 3.1212 MBs/sec
thread 0: 200000 messages sent 3685956.5057 nMsg/sec 3.5152 MBs/sec
thread 0: 300000 messages sent 3717472.1190 nMsg/sec 3.5453 MBs/sec
thread 0: 400000 messages sent 3730647.2673 nMsg/sec 3.5578 MBs/sec
thread 0: 500000 messages sent 3730647.2673 nMsg/sec 3.5578 MBs/sec
thread 0: 600000 messages sent 3722315.2801 nMsg/sec 3.5499 MBs/sec
thread 0: 700000 messages sent 3718854.5928 nMsg/sec 3.5466 MBs/sec
thread 0: 800000 messages sent 3714020.4271 nMsg/sec 3.5420 MBs/sec
thread 0: 900000 messages sent 3713330.8578 nMsg/sec 3.5413 MBs/sec
thread 0: 1000000 messages sent 3710575.1391 nMsg/sec 3.5387 MBs/sec
thread 0: 1100000 messages sent 3711263.6853 nMsg/sec 3.5393 MBs/sec
thread 0: 1200000 messages sent 3716090.6726 nMsg/sec 3.5439 MBs/sec
thread 0: 1300000 messages sent 3709198.8131 nMsg/sec 3.5374 MBs/sec
thread 0: 1400000 messages sent 3705762.4606 nMsg/sec 3.5341 MBs/sec
thread 0: 1500000 messages sent 3701647.2330 nMsg/sec 3.5302 MBs/sec
thread 0: 1600000 messages sent 3696174.4594 nMsg/sec 3.5249 MBs/sec
thread 0: 1700000 messages sent 3703703.7037 nMsg/sec 3.5321 MBs/sec
thread 0: 1800000 messages sent 3703017.9596 nMsg/sec 3.5315 MBs/sec
thread 0: 1900000 messages sent 3700277.5208 nMsg/sec 3.5289 MBs/sec
thread 0: 2000000 messages sent 3702332.4695 nMsg/sec 3.5308 MBs/sec
[2011-05-17 14:33:01,102] INFO Closing all async producers (kafka.producer.ProducerPool)
[2011-05-17 14:33:01,103] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
Total Num Messages: 2000000 bytes: 400000000 in 108.678 secs
Messages/sec: 18402.9886
MB/sec: 3.5101
wait for data to be persisted
test passed
bin/../../../bin/kafka-server-start.sh: line 11: 21110 Terminated $(dirname $0)/kafka-run-class.sh kafka.Kafka $@
bin/../../../bin/zookeeper-server-start.sh: line 9: 21109 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

View File

@ -0,0 +1,48 @@
#!/bin/bash
num_messages=2000000
message_size=200
base_dir=$(dirname $0)/../../../../
rm -rf /tmp/zookeeper
rm -rf /tmp/kafka-logs
echo "start the servers ..."
$base_dir/bin/zookeeper-server-start.sh $base_dir/config/zookeeper.properties 2>&1 > $base_dir/zookeeper.log &
$base_dir/bin/kafka-server-start.sh $base_dir/config/server.properties 2>&1 > $base_dir/kafka.log &
sleep 4
echo "start producing $num_messages messages ..."
$base_dir/bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10
echo "wait for data to be persisted"
cur_offset="-1"
quit=0
while [ $quit -eq 0 ]
do
sleep 2
target_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
if [ $target_size -eq $cur_offset ]
then
quit=1
fi
cur_offset=$target_size
done
sleep 2
actual_size=`$base_dir/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic test01 --partition 0 --time -1 --offsets 1 | tail -1`
msg_full_size=`expr $message_size + 9`
expected_size=`expr $num_messages \* $msg_full_size`
if [ $actual_size != $expected_size ]
then
echo "actual size: $actual_size expected size: $expected_size test failed!!! look at it!!!"
else
echo "test passed"
fi
ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
sleep 2
ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null

View File

@ -0,0 +1,64 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=0
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# number of logical partitions on this broker
num.partitions=1
# the port the socket server runs on
port=9092
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
# the directory in which to store log files
log.dir=/tmp/kafka-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
# the maximum size of a log segment
log.file.size=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
# the minimum age of a log file to eligible for deletion
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=600
#set the following properties to use zookeeper
# enable connecting to zookeeper
enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=1000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,4 @@
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

View File

@ -0,0 +1,9 @@
#!/bin/bash
if [ $# -ne 1 ];
then
echo "USAGE: $0 zookeeper.properties"
exit 1
fi
$(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

View File

@ -0,0 +1,2 @@
#!/bin/sh
ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}' | xargs kill -SIGINT

View File

@ -0,0 +1,23 @@
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -ne 1 ];
then
echo "USAGE: $0 zookeeper_host:port[/path]"
exit 1
fi
$(dirname $0)/kafka-run-class.sh org.apache.zookeeper.ZooKeeperMain -server $1

View File

@ -0,0 +1,12 @@
# see kafka.consumer.ConsumerConfig for more details
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=127.0.0.1:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
#consumer group id
groupid=group1

View File

@ -0,0 +1,10 @@
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
# Turn on all our debugging info
#log4j.logger.kafka=INFO
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG

View File

@ -0,0 +1,64 @@
# see kafka.server.KafkaConfig for additional details and defaults
# the id of the broker
brokerid=0
# hostname of broker. If not set, will pick up from the value returned
# from getLocalHost. If there are multiple interfaces getLocalHost
# may not be what you want.
# hostname=
# number of logical partitions on this broker
num.partitions=1
# the port the socket server runs on
port=9092
# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
num.threads=8
# the directory in which to store log files
log.dir=/tmp/kafka-logs
# the send buffer used by the socket server
socket.send.buffer=1048576
# the receive buffer used by the socket server
socket.receive.buffer=1048576
# the maximum size of a log segment
log.file.size=536870912
# the interval between running cleanup on the logs
log.cleanup.interval.mins=1
# the minimum age of a log file to eligible for deletion
log.retention.hours=168
#the number of messages to accept without flushing the log to disk
log.flush.interval=1
#set the following properties to use zookeeper
# enable connecting to zookeeper
enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=localhost:2181
# timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
# time based topic flush intervals in ms
#topic.flush.intervals.ms=topic:1000
# default time based flush interval in ms
log.default.flush.interval.ms=2000
# time based topic flasher time rate in ms
log.default.flush.scheduler.interval.ms=1000
# topic partition count map
# topic.partition.count.map=topic1:3, topic2:4

View File

@ -0,0 +1,7 @@
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
# this will be overridden by the 01-start-zookeeper.sh script to be the port defined in env-cluster.sh
clientPort=2181
maxClientCnxns=30

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,20 @@
#!/bin/bash
ENV_CLUSTER="`dirname $0`/../../env-cluster.sh"
source $ENV_CLUSTER
set +u
shopt -s xpg_echo
shopt -s expand_aliases
trap "exit 1" 1 2 3 15
QUERY_FILE="${1:-group_by}_query.body"
[ ! -e $QUERY_FILE ] && echo "expecting file $QUERY_FILE to be in current directory" && exit 2
for delay in 1 15 15 15 15 15 15 15 15 15 15
do
echo "sleep for $delay seconds..."
echo " "
sleep $delay
time curl -sX POST "http://${DRUID_BROKER_HOST}:${DRUID_BROKER_PORT}/druid/v2/?pretty=true" -H 'content-type: application/json' -d @$QUERY_FILE
done
echo "$0 finished"

View File

@ -0,0 +1,11 @@
{
"queryType": "groupBy",
"dataSource": "appevents",
"granularity": "all",
"dimensions": ["appid", "event"],
"aggregations":[
{"type":"count", "name":"eventcount"},
{"type":"doubleSum", "fieldName":"events", "name":"eventssum"}
],
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}

View File

@ -0,0 +1,2 @@
#!/bin/bash
ps -eaf | grep BrokerMain | grep -v grep | awk '{print $2}' | xargs kill

View File

@ -0,0 +1,2 @@
#!/bin/bash
ps -eaf | grep ComputeMain | grep -v grep | awk '{print $2}' | xargs kill

View File

@ -0,0 +1,2 @@
#!/bin/bash
ps -eaf | grep MasterMain | grep -v grep | awk '{print $2}' | xargs kill

View File

@ -0,0 +1,2 @@
#!/bin/bash
ps -eaf | grep RealtimeMain | grep -v grep | awk '{print $2}' | xargs kill

2
test-harness/stop-firehose.sh Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
ps -eaf | grep conjure | grep -v grep | awk '{print $2}' | xargs kill

2
test-harness/stop-kafka.sh Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
ps -eaf | grep kafka\.Kafka | grep -v grep | awk '{print $2}' | xargs kill

2
test-harness/stop-zookeeper.sh Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
ps -eaf | grep QuorumPeerMain | grep -v grep | awk '{print $2}' | xargs kill