update example + tutorial for kafka 0.8

This commit is contained in:
Xavier Léauté 2015-02-17 20:26:58 -08:00
parent 018653700b
commit cff218ee20
10 changed files with 145 additions and 488 deletions

View File

@ -1,35 +0,0 @@
---
layout: doc_page
---
Ingesting from Kafka 8
----------------------
The previous examples are for Kafka 7. To support Kafka 8, a couple changes need to be made:
- Update realtime node's configs for Kafka 8 extensions
- e.g.
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven",...]`
- becomes
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight",...]`
- Update realtime task config for changed keys
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
```json
"firehose" : {
"type" : "kafka-0.8",
"consumerProps" : {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms": "15000",
"zookeeper.session.timeout.ms": "15000",
"zookeeper.sync.time.ms": "5000",
"group.id": "topic-pixel-local",
"fetch.message.max.bytes": "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed" : "druidtest"
},
"plumber" : {
"type" : "realtime"
}
```

View File

@ -66,16 +66,16 @@ The property `druid.realtime.specFile` has the path of a file (absolute or relat
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.7.2",
"type": "kafka-0.8",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
},

View File

@ -10,7 +10,7 @@ This simple Druid cluster configuration can be used for initially experimenting
```
# Extensions
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven"]
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-eight"]
# Zookeeper (defaults to localhost)

View File

@ -1,226 +0,0 @@
---
layout: doc_page
---
This page describes how to use Riak-CS for deep storage instead of S3. We are still setting up some of the peripheral stuff (file downloads, etc.).
This guide provided by Pablo Nebrera, thanks!
## The VMWare instance
A VMWare [image](http://static.druid.io/artifacts/vmware/druid_riak.tgz) based on Druid 0.3.27.2 and built according to the instructions below has also been provided by Pablo Nebrera.
The provided vmware machine has access with the following credentials:
username: root
password: riakdruid
## The Setup
We started with a minimal CentOS installation but you can use any other compatible installation. At the end of this setup you will one node that is running:
1. A Kafka Broker
1. A single-node Zookeeper ensemble
1. A single-node Riak-CS cluster
1. A Druid [Coordinator](Coordinator.html)
1. A Druid [Broker](Broker.html)
1. A Druid [Historical](Historical.html)
1. A Druid [Realtime](Realtime.html)
This just walks through getting the relevant software installed and running. You will then need to configure the [Realtime](Realtime.html) node to take in your data.
### Configure System
1. Install `CentOS-6.4-x86_64-minimal.iso` ("RedHat v6.4" is the name of the AWS AMI) or your favorite Linux OS (if you use a different OS, some of the installation instructions for peripheral services might differ, please adjust them according to the system you are using). The rest of these instructions assume that you have a running instance and are running as the root user.
1. Configure the network. We used dhcp executing:
dhclient eth0
1. Disable firewall for now
service iptables stop
chkconfig iptables off
1. Change the limits on the number of open files a process can have:
cat >> /etc/security/limits.conf <<- _RBEOF_
# ulimit settings for Riak CS
root soft nofile 65536
root hard nofile 65536
riak soft nofile 65536
riak hard nofile 65536
_RBEOF_
ulimit -n 65536
### Install base software packages
1. Install necessary software with yum
yum install -y java-1.7.0-openjdk-devel git wget metadata storage-server
1. Install maven
wget http://apache.rediris.es/maven/maven-3/3.0.5/binaries/apache-maven-3.0.5-bin.tar.gz
tar xzf apache-maven-3.0.5-bin.tar.gz -C /usr/local
pushd /usr/local
sudo ln -s apache-maven-3.0.5 maven
popd
echo 'export M2_HOME=/usr/local/maven' >> /etc/profile.d/maven.sh
echo 'export PATH=${M2_HOME}/bin:${PATH}' >> /etc/profile.d/maven.sh
source /etc/profile.d/maven.sh
1. Install erlang
wget http://binaries.erlang-solutions.com/rpm/centos/6/x86_64/esl-erlang-R15B01-1.x86_64.rpm
yum localinstall -y esl-erlang-R15B01-1.x86_64.rpm
### Install Kafka And Zookeeper
1. Install kafka and zookeeper:
wget http://apache.rediris.es/incubator/kafka/kafka-0.7.2-incubating/kafka-0.7.2-incubating-src.tgz
tar zxvf kafka-0.7.2-incubating-src.tgz
pushd kafka-0.7.2-incubating-src/
./sbt update
./sbt package
mkdir -p /var/lib/kafka
rsync -a * /var/lib/kafka/
popd
### Install Riak-CS
1. Install s3cmd to manage riak s3
wget http://downloads.sourceforge.net/project/s3tools/s3cmd/1.5.0-alpha3/s3cmd-1.5.0-alpha3.tar.gz
tar xzvf s3cmd-1.5.0-alpha3.tar.gz
cd s3cmd-1.5.0-alpha3
cp -r s3cmd S3 /usr/local/bin/
1. Install riak, riak-cs and stanchion. Note: riak-cs-control is optional
wget http://s3.amazonaws.com/downloads.basho.com/riak/1.3/1.3.1/rhel/6/riak-1.3.1-1.el6.x86_64.rpm
wget http://s3.amazonaws.com/downloads.basho.com/riak-cs/1.3/1.3.1/rhel/6/riak-cs-1.3.1-1.el6.x86_64.rpm
wget http://s3.amazonaws.com/downloads.basho.com/stanchion/1.3/1.3.1/rhel/6/stanchion-1.3.1-1.el6.x86_64.rpm
wget http://s3.amazonaws.com/downloads.basho.com/riak-cs-control/1.0/1.0.0/rhel/6/riak-cs-control-1.0.0-1.el6.x86_64.rpm
yum localinstall -y riak-*.rpm stanchion-*.rpm
### Install Druid
1. Clone the git repository for druid, checkout a "stable" tag and build
git clone https://github.com/druid-io/druid.git druid
pushd druid
git checkout druid-0.4.12
export LANGUAGE=C
export LC_MESSAGE=C
export LC_ALL=C
export LANG=en_US
./build.sh
mkdir -p /var/lib/druid/app
cp ./services/target/druid-services-*-selfcontained.jar /var/lib/druid/app
ln -s /var/lib/druid/app/druid-services-*-selfcontained.jar /var/lib/druid/app/druid-services.jar
popd
### Configure stuff
1. Add this line to /etc/hosts
echo "127.0.0.1 s3.amazonaws.com bucket.s3.amazonaws.com `hostname`" >> /etc/hosts
NOTE: the bucket name in this case is "bucket", but you might need to update it to your bucket name if you want to use a different bucket name.
1. Download and extract run scripts and configuration files:
wget http://static.druid.io/artifacts/scripts/druid_scripts_nebrera.tar /
pushd /
tar xvf ~/druid_scripts_nebrera.tar
popd
1. Start Riak in order to create a user:
/etc/init.d/riak start
/etc/init.d/riak-cs start
/etc/init.d/stanchion start
You can check riak status using:
riak-admin member-status
You should expect results like
Attempting to restart script through sudo -H -u riak
================================= Membership ==================================
Status Ring Pending Node
-------------------------------------------------------------------------------
valid 100.0% -- 'riak@127.0.0.1'
-------------------------------------------------------------------------------
Valid:1 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
1. Create riak-cs user and yoink out credentials.
curl -H 'Content-Type: application/json' -X POST http://127.0.0.1:8088/riak-cs/user --data '{"email":"example@domain.com", "name":"admin"}' >> /tmp/riak_user.json
export RIAK_KEY_ID=`sed 's/^.*"key_id":"//' /tmp/riak_user.json | cut -d '"' -f 1`
export RIAK_KEY_SECRET=`sed 's/^.*"key_secret":"//' /tmp/riak_user.json | cut -d '"' -f 1`
sed -i "s/<%=[ ]*@key_id[ ]*%>/${RIAK_KEY_ID}/" /etc/riak-cs/app.config /etc/riak-cs-control/app.config /etc/stanchion/app.config /etc/druid/config.sh /etc/druid/base.properties /root/.s3cfg
sed -i "s/<%=[ ]*@key_secret[ ]*%>/${RIAK_KEY_SECRET}/" /etc/riak-cs/app.config /etc/riak-cs-control/app.config /etc/stanchion/app.config /etc/druid/config.sh /etc/druid/base.properties /root/.s3cfg
This will store the result of creating the user into `/tmp/riak_user.json`. You can look at it if you are interested. It will look something like this
{"email":"example@domain.com",
"display_name":"example",
"name":"admin",
"key_id":"DOXKZYR_QM2S-7HSKAEU",
"key_secret":"GtvVJow068RM-_viHIYR9DWMAXsFcL1SmjuNfA==",
"id":"4c5b5468c180f3efafd531b6cd8e2bb24371d99640aad5ced5fbbc0604fc473d",
"status":"enabled"}
1. Stop riak-cs:
/etc/init.d/riak-cs stop
/etc/init.d/stanchion stop
/etc/init.d/riak stop
1. Disable anonymous user creation
sed 's/{[ ]*anonymous_user_creation[ ]*,[ ]*true[ ]*}/{anonymous_user_creation, false}/' /etc/riak-cs/app.config |grep anonymous_user_creation
1. Restart riak-cs services:
/etc/init.d/riak start
/etc/init.d/riak-cs start
/etc/init.d/stanchion start
1. Create your bucket. The example name and in config files is "bucket"
s3cmd mb s3://bucket
You can verify that the bucket is created with:
s3cmd ls
1. Start metadata storage server
service metadata storaged start
chkconfig metadata storaged on
/usr/bin/metadata storageadmin -u root password 'riakdruid'
NOTE: If you don't like "riakdruid" as your password, feel free to change it around.
NOTE: If you have used root user to connect to database. It should be changed by other user but I have used this one to simplify it
1. Start zookeeper and kafka
/etc/init.d/zookeeper start
/etc/init.d/kafka start
1. Start druid
/etc/init.d/druid_master start
/etc/init.d/druid_realtime start
/etc/init.d/druid_broker start
/etc/init.d/druid_compute start

View File

@ -218,16 +218,16 @@ The indexing service can also run real-time tasks. These tasks effectively trans
"ioConfig": {
"type": "realtime",
"firehose": {
"type": "kafka-0.7.2",
"type": "kafka-0.8",
"consumerProps": {
"zk.connect": "zk_connect_string",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "consumer-group",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
"zookeeper.connect": "zk_connect_string",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "consumer-group",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "your_kafka_topic"
},

View File

@ -2,226 +2,145 @@
layout: doc_page
---
# Tutorial: Loading Streaming Data
In our last [tutorial](Tutorial%3A-The-Druid-Cluster.html), we set up a complete Druid cluster. We created all the Druid dependencies and ingested streaming data. In this tutorial, we will expand upon what we've done in the first two tutorials.
# Loading Streaming Data
About the data
--------------
In our [last tutorial](Tutorial%3A-The-Druid-Cluster.html), we set up a
complete Druid cluster. We created all the Druid dependencies and ingested
streaming data. In this tutorial, we will expand upon what we've done in the
first two tutorials.
The data source we'll be working with is Wikipedia edits once again. The data schema is the same as the previous tutorials:
## About the Data
Dimensions (things to filter on):
We will be working with the same Wikipedia edits data schema [from out previous
tutorials](http://localhost:4000/content/Tutorial:-A-First-Look-at-Druid.html#about-the-data).
```json
"page"
"language"
"user"
"unpatrolled"
"newPage"
"robot"
"anonymous"
"namespace"
"continent"
"country"
"region"
"city"
```
## Set Up
Metrics (things to aggregate over):
At this point, you should already have Druid downloaded and be comfortable
running a Druid cluster locally. If not, [have a look at our second
tutorial](Tutorial%3A-The-Druid-Cluster.html). If Zookeeper and MySQL are not
running, you will have to start them as described in [The Druid
Cluster](Tutorial%3A-The-Druid-Cluster.html).
```json
"count"
"added"
"delta"
"deleted"
```
Setting Up
----------
With real-world data, we recommend having a message bus such as [Apache
Kafka](http://kafka.apache.org/) sit between the data stream and the real-time
node. The message bus provides higher availability for production environments.
[Firehoses](Firehose.html) are the key abstraction for real-time ingestion.
At this point, you should already have Druid downloaded and are comfortable with running a Druid cluster locally. If you are not, see [here](Tutorial%3A-The-Druid-Cluster.html). If Zookeeper and MySQL aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
### Kafka
With real-world data, we recommend having a message bus such as [Apache Kafka](http://kafka.apache.org/) sit between the data stream and the real-time node. The message bus provides higher availability for production environments. [Firehoses](Firehose.html) are the key abstraction for real-time ingestion.
Druid communicates with Kafka using the
[KafkaFirehoseFactory](Firehose.html). Using this [Firehose](Firehose.html)
with the right configuration, we can import data into Druid in real-time
without writing any code. To load data to a real-time node via Kafka, we'll
first need to initialize Zookeeper and Kafka, and then configure and initialize
a [Realtime](Realtime.html) node.
<a id="set-up-kafka"></a>
#### Setting up Kafka
The following quick-start instructions for booting a Zookeeper and then Kafka
cluster were adapted from the [Apache Kafka quickstart guide](http://kafka.apache.org/documentation.html#quickstart).
[KafkaFirehoseFactory](Firehose.html) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
1. Download Kafka
The following quick-start instructions for booting a Zookeeper and then Kafka cluster were taken from the [Kafka website](http://kafka.apache.org/07/quickstart.html).
For this tutorial we will [download Kafka 0.8.2]
(https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz)
1. Download Apache Kafka 0.7.2 from [http://kafka.apache.org/downloads.html](http://kafka.apache.org/downloads.html)
```bash
tar -xzf kafka_2.10-0.8.2.0.tgz
cd kafka_2.10-0.8.2.0
```
```bash
wget http://archive.apache.org/dist/kafka/old_releases/kafka-0.7.2-incubating/kafka-0.7.2-incubating-src.tgz
tar -xvzf kafka-0.7.2-incubating-src.tgz
cd kafka-0.7.2-incubating-src
```
1. Start Kafka
2. Build Kafka
First launch ZooKeeper:
```bash
./sbt update
./sbt package
```
```bash
./bin/zookeeper-server-start.sh config/zookeeper.properties
```
3. Boot Kafka
Then start the Kafka server (in a separate console):
```bash
cat config/zookeeper.properties
bin/zookeeper-server-start.sh config/zookeeper.properties
```bash
./bin/kafka-server-start.sh config/server.properties
```
# in a new console
bin/kafka-server-start.sh config/server.properties
```
1. Create a topic named `wikipedia`
4. Launch the console producer (so you can type in JSON kafka messages in a bit)
```bash
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic wikipedia
```
```bash
bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic wikipedia
```
1. Launch a console producer for that topic (so we can paste in kafka
messages in a bit)
When things are ready, you should see log messages such as:
```bash
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia
```
```
[2013-10-09 22:03:07,802] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
```
### Druid Realtime Node
#### Launch a Realtime Node
The realtime spec for the data source in this tutorial is available under
`examples/indexing/wikipedia.spec` from the [Druid
download](http://static.druid.io/artifacts/releases/druid-services-0.7.0-rc3-bin.tar.gz)
You should be comfortable starting Druid nodes at this point. If not, it may be worthwhile to revisit the first few tutorials.
1. Launch the realtime node
1. Real-time nodes can be started with:
```bash
java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \
-Ddruid.realtime.specFile=examples/indexing/wikipedia.spec \
-classpath "config/_common:config/realtime:lib/*" \
io.druid.cli.Main server realtime
```
```bash
java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath lib/*:config/_common:config/realtime io.druid.cli.Main server realtime
```
1. Copy and paste the following data into the terminal where we launched
the Kafka console producer above.
2. A realtime.spec should already exist for the data source in the Druid tarball. You should be able to find it at:
```json
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
```
```bash
examples/indexing/wikipedia.spec
```
**Note:** This config uses a [`messageTime` rejection policy](Plumber.html)
which will accept all events and hand off as long as there is a continuous
stream of events. In this particular example, hand-off will not actually
occur because we only have a handful of events.
The contents of the file should match:
Disclaimer: We recognize the timestamps of these events aren't actually recent.
```json
[
{
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
}
1. Watch the events getting ingested and the real-time node announcing a data
segment
```
...
2015-02-17T23:01:50,220 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T00:00:00.000Z_2013-09-01T00:00:00.000Z_2013-08-31T00:00:00.000Z] at path[/druid/segments/localhost:8083/2015-02-17T23:01:50.219Z0]
...
```
1. Issue a query
Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) to the real-time node
should return some results:
```bash
curl -XPOST -H'Content-type: application/json' \
"http://localhost:8083/druid/v2/?pretty" \
-d'{"queryType":"timeBoundary","dataSource":"wikipedia"}'
```
```json
[ {
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-08-31T12:41:27.000Z"
}
}
]
```
} ]
```
Note: This config uses a "messageTime" [rejection policy](Plumber.html) which will accept all events and hand off as long as there is a continuous stream of events. In this particular example, hand-off will not actually occur because we only have a few events.
3. Let's copy and paste some data into the Kafka console producer
```json
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
```
Disclaimer: We recognize the timestamps of these events aren't actually recent.
5. Watch the events as they are ingested by Druid's real-time node:
```bash
...
2013-10-10 05:13:18,976 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T01:00:00.000Z_2013-08-31T02:00:00.000Z_2013-08-31T01:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:18,992 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T03:00:00.000Z_2013-08-31T04:00:00.000Z_2013-08-31T03:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:18,997 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T07:00:00.000Z_2013-08-31T08:00:00.000Z_2013-08-31T07:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:19,003 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T11:00:00.000Z_2013-08-31T12:00:00.000Z_2013-08-31T11:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:19,008 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T12:00:00.000Z_2013-08-31T13:00:00.000Z_2013-08-31T12:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
...
```
Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) to the real-time node should yield valid results:
```json
[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-08-31T12:41:27.000Z"
}
}
]
```
#### Advanced Streaming Ingestion
## Advanced Streaming Ingestion
Druid offers an additional method of ingesting streaming data via the indexing service. You may be wondering why a second method is needed. Standalone real-time nodes are sufficient for certain volumes of data and availability tolerances. They pull data from a message queue like Kafka or Rabbit, index data locally, and periodically finalize segments for handoff to historical nodes. They are fairly straightforward to scale, simply taking advantage of the innate scalability of the backing message queue. But they are difficult to make highly available with Kafka, the most popular supported message queue, because its high-level consumer doesnt provide a way to scale out two replicated consumer groups such that each one gets the same data in the same shard. They also become difficult to manage once you have a lot of them, since every machine needs a unique configuration.

View File

@ -32,7 +32,6 @@ h2. Configuration
h2. Data Ingestion
* "Ingestion FAQ":./Ingestion-FAQ.html
* "Realtime":./Realtime-ingestion.html
** "Kafka-0.8.x Ingestion":./Kafka-Eight.html
* "Batch":./Batch-ingestion.html
** "Different Hadoop Versions":./Other-Hadoop.html
* "Indexing Service":./Indexing-Service.html

View File

@ -42,16 +42,16 @@
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.7.2",
"type": "kafka-0.8",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
},

View File

@ -61,16 +61,16 @@
"ioConfig": {
"type": "realtime",
"firehose": {
"type": "kafka-0.7.2",
"type": "kafka-0.8",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
},

View File

@ -16,7 +16,7 @@
#
# Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-seven","io.druid.extensions:mysql-metadata-storage"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight","io.druid.extensions:mysql-metadata-storage"]
# Zookeeper
druid.zk.service.host=localhost