Merge pull request #948 from metamx/ingestion-docs

Redocumenting ingestion
This commit is contained in:
Fangjin Yang 2014-12-09 15:30:03 -07:00
commit d6d3ec6846
36 changed files with 1054 additions and 1639 deletions

View File

@ -8,113 +8,112 @@ There are two choices for batch data ingestion to your Druid cluster, you can us
Which should I use?
-------------------
The [Indexing service](Indexing-Service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.
The [Indexing service](Indexing-Service.html) is a set of nodes that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [metadata store](Metadata-storage.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.
The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and dont want to spend the time configuring and deploying the [Indexing service](Indexing-Service.html) just yet.
Batch Ingestion using the HadoopDruidIndexer
--------------------------------------------
## Batch Ingestion using the HadoopDruidIndexer
The HadoopDruidIndexer can be run like so:
```
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:<hadoop_config_path> io.druid.cli.Main index hadoop <config_file>
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:<hadoop_config_path> io.druid.cli.Main index hadoop <spec_file>
```
The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Time_intervals) of the data you are processing. The config\_file is a path to a file (the "specFile") that contains JSON and an example looks like:
## Hadoop "specFile"
The spec\_file is a path to a file that contains JSON and an example looks like:
```json
{
"dataSource": "the_data_source",
"timestampSpec" : {
"column": "ts",
"format": "<iso, millis, posix, auto or any Joda time format>"
},
"dataSpec": {
"format": "<csv, tsv, or json>",
"columns": [
"ts",
"column_1",
"column_2",
"column_3",
"column_4",
"column_5"
],
"dimensions": [
"column_1",
"column_2",
"column_3"
]
},
"granularitySpec": {
"type": "uniform",
"intervals": [
"<ISO8601 interval:http:\/\/en.wikipedia.org\/wiki\/ISO_8601#Time_intervals>"
],
"gran": "day"
},
"pathSpec": {
"type": "static",
"paths" : "example/path/data.gz,example/path/moredata.gz"
},
"rollupSpec": {
"aggs": [
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [
{
"type": "count",
"name": "event_count"
"type" : "count",
"name" : "count"
},
{
"type": "doubleSum",
"fieldName": "column_4",
"name": "revenue"
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type": "longSum",
"fieldName": "column_5",
"name": "clicks"
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"rollupGranularity": "minute"
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"workingPath": "\/tmp\/path\/on\/hdfs",
"segmentOutputPath": "s3n:\/\/billy-bucket\/the\/segments\/go\/here",
"leaveIntermediate": "false",
"partitionsSpec": {
"type": "hashed"
"targetPartitionSize": 5000000
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"paths" : "/MyDirectory/examples/indexing/wikipedia_data.json"
},
"metadataUpdateSpec" : {
"type":"metadata",
"connectURI" : "jdbc:metadata storage://localhost:3306/druid",
"password" : "diurd",
"segmentTable" : "druid_segments",
"user" : "druid"
},
"segmentOutputPath" : "/MyDirectory/data/index/output"
},
"updaterJobSpec": {
"type": "db",
"connectURI": "jdbc:mysql:\/\/localhost:7980\/test_db",
"user": "username",
"password": "passmeup",
"segmentTable": "segments"
},
"jobProperties": {
"mapreduce.job.queuename": "default"
"tuningConfig" : {
"type" : "hadoop",
"targetPartitionSize" : 5000000,
"jobProperties": {
"mapreduce.job.queuename": "default"
}
}
}
```
### Hadoop Index Config
### DataSchema
|property|description|required?|
|--------|-----------|---------|
|dataSource|name of the dataSource the data will belong to|yes|
|timestampSpec|includes the column that is to be used as the timestamp column and the format of the timestamps; auto = either iso or millis, Joda time formats can be found [here](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html).|yes|
|dataSpec|a specification of the data format and an array that names all of the columns in the input data.|yes|
|dimensions|the columns that are to be used as dimensions.|yes|
|granularitySpec|the time granularity and interval to chunk segments up into.|yes|
|pathSpec|a specification of where to pull the data in from|yes|
|rollupSpec|a specification of the rollup to perform while processing the data|yes|
|workingPath|the working path to use for intermediate results (results between Hadoop jobs).|yes|
|segmentOutputPath|the path to dump segments into.|yes|
|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool).|no|
|partitionsSpec|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur.|no|
|metadataUpdateSpec|a specification of how to update the metadata for the druid cluster these segments belong to.|yes|
|jobProperties|a map of properties to add to the Hadoop job configuration.|no|
This field is required.
### Path specification
See [Ingestion](Ingestion.html)
### IOConfig
This field is required.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'hadoop'.|yes|
|pathSpec|Object|a specification of where to pull the data in from|yes|
|segmentOutputPath|String|the path to dump segments into.|yes|
|metadataUpdateSpec|Object|a specification of how to update the metadata for the druid cluster these segments belong to.|yes|
#### Path specification
There are multiple types of path specification:
@ -122,9 +121,9 @@ There are multiple types of path specification:
Is a type of data loader where a static path to where the data files are located is passed.
|property|description|required?|
|--------|-----------|---------|
|paths|A String of input paths indicating where the raw data is located.|yes|
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|paths|Array of String|A String of input paths indicating where the raw data is located.|yes|
For example, using the static input paths:
@ -136,11 +135,11 @@ For example, using the static input paths:
Is a type of data loader that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase).
|property|description|required?|
|--------|-----------|---------|
|dataGranularity|specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`.|yes|
|inputPath|Base path to append the expected time path to.|yes|
|filePattern|Pattern that files should match to be included.|yes|
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|dataGranularity|Object|specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`.|yes|
|inputPath|String|Base path to append the expected time path to.|yes|
|filePattern|String|Pattern that files should match to be included.|yes|
For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths
@ -151,14 +150,36 @@ s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=01
s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=23
```
### Rollup specification
#### Metadata Update Job Spec
The indexing process has the ability to roll data up as it processes the incoming data. If data has already been summarized, summarizing it again will produce the same results so either way is not a problem. This specifies how that rollup should take place.
This is a specification of the properties that tell the job how to update metadata such that the Druid cluster will see the output segments and load them.
|property|description|required?|
|--------|-----------|---------|
|aggs|specifies a list of aggregators to aggregate for each bucket (a bucket is defined by the tuple of the truncated timestamp and the dimensions). Aggregators available here are the same as available when querying.|yes|
|rollupGranularity|The granularity to use when truncating incoming timestamps for bucketization.|yes|
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|"metadata" is the only value available.|yes|
|connectURI|String|A valid JDBC url to metadata storage.|yes|
|user|String|Username for db.|yes|
|password|String|password for db.|yes|
|segmentTable|String|Table to use in DB.|yes|
These properties should parrot what you have configured for your [Coordinator](Coordinator.html).
### TuningConfig
The tuningConfig is optional and default parameters will be used if no tuningConfig is specified.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|workingPath|String|the working path to use for intermediate results (results between Hadoop jobs).|no (default == '/tmp/druid-indexing')|
|version|String|The version of created segments.|no (default == datetime that indexing starts at)|
|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool).|no (default == false)|
|partitionsSpec|Object|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur.More details below.|no (default == 'hashed'|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|no (default == 5 million)|
|cleanupOnFailure|Boolean|Cleans up intermediate files when the job fails as opposed to leaving them around for debugging.|no (default == true)|
|overwriteFiles|Boolean|Override existing files found during indexing.|no (default == false)|
|ignoreInvalidRows|Boolean|Ignore rows found to have problems.|no (default == false)|
|jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)|
### Partitioning specification
@ -215,24 +236,10 @@ The configuration options are:
|partitionDimension|the dimension to partition on. Leave blank to select a dimension automatically.|no|
|assumeGrouped|assume input data has already been grouped on time and dimensions. Ingestion will run faster, but can choose suboptimal partitions if the assumption is violated.|no|
### Updater job spec
This is a specification of the properties that tell the job how to update metadata such that the Druid cluster will see the output segments and load them.
|property|description|required?|
|--------|-----------|---------|
|type|"db" is the only value available.|yes|
|connectURI|A valid JDBC url to MySQL.|yes|
|user|Username for db.|yes|
|password|password for db.|yes|
|segmentTable|Table to use in DB.|yes|
These properties should parrot what you have configured for your [Coordinator](Coordinator.html).
Batch Ingestion Using the Indexing Service
------------------------------------------
Batch ingestion for the indexing service is done by submitting a [Hadoop Index Task](Tasks.html). The indexing service can be started by issuing:
Batch ingestion for the indexing service is done by submitting an [Index Task](Tasks.html) (for datasets < 1G) or a [Hadoop Index Task](Tasks.html). The indexing service can be started by issuing:
```
java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/overlord io.druid.cli.Main server overlord
@ -245,53 +252,86 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
```json
{
"type" : "index_hadoop",
"config": {
"dataSource" : "example",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dataSpec" : {
"format" : "json",
"dimensions" : ["dim1","dim2","dim3"]
},
"granularitySpec" : {
"type" : "uniform",
"gran" : "DAY",
"intervals" : [ "2013-08-31/2013-09-01" ]
},
"pathSpec" : {
"type" : "static",
"paths" : "data.json"
},
"targetPartitionSize" : 5000000,
"rollupSpec" : {
"aggs": [{
"schema" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
}, {
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"rollupGranularity" : "none"
}
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"paths" : "/MyDirectory/examples/indexing/wikipedia_data.json"
}
},
"tuningConfig" : {
"type": "hadoop"
}
}
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "index_hadoop".|yes|
|config|A Hadoop Index Config (see above).|yes|
|hadoopCoordinates|The Maven `<groupId>:<artifactId>:<version>` of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
### DataSchema
This field is required.
See [Ingestion](Ingestion.html)
### IOConfig
This field is required.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'hadoop'.|yes|
|pathSpec|Object|a specification of where to pull the data in from|yes|
### TuningConfig
The tuningConfig is optional and default parameters will be used if no tuningConfig is specified. This is the same as the tuningConfig for the standalone Hadoop indexer. See above for more details.
### Running the Task
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.

View File

@ -92,4 +92,4 @@ To check the state of the overlord, open up your browser and go to `#{OVERLORD_P
Next, go to `#{COORDINATOR_PUBLIC_IP_ADDR}:#{PORT}`. Click "View Information about the Cluster"->"Full Cluster View." You should now see the information about servers and segments. If the cluster runs correctly, Segment dimensions and Segment binaryVersion fields should be filled up. Allow few minutes for the segments to be processed.
Now you should be able to query the data using broker's public IP address.
Now you should be able to query the data using broker's public IP address.

View File

@ -241,7 +241,7 @@ This module is used to read announcements of segments in ZooKeeper. The configs
### Database Connector Module
These properties specify the jdbc connection and other configuration around the database. The only processes that connect to the DB with these properties are the [Coordinator](Coordinator.html) and [Indexing service](Indexing-service.html). This is tested on MySQL.
These properties specify the jdbc connection and other configuration around the database. The only processes that connect to the DB with these properties are the [Coordinator](Coordinator.html) and [Indexing service](Indexing-service.html). This is tested on metadata storage.
|Property|Description|Default|
|--------|-----------|-------|

View File

@ -25,7 +25,7 @@ The coordinator module uses several of the default modules in [Configuration](Co
Dynamic Configuration
---------------------
The coordinator has dynamic configuration to change certain behaviour on the fly. The coordinator a JSON spec object from the Druid [MySQL](MySQL.html) config table. This object is detailed below:
The coordinator has dynamic configuration to change certain behaviour on the fly. The coordinator a JSON spec object from the Druid [metadata storage](Metadata-storage.html) config table. This object is detailed below:
It is recommended that you use the Coordinator Console to configure these parameters. However, if you need to do it via HTTP, the JSON object can be submitted to the overlord via a POST request at:
@ -57,4 +57,4 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5|
|`replicantLifetime`|The maximum number of coordinator runs for a segment to be replicated before we start alerting.|15|
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|

View File

@ -31,7 +31,7 @@ The node types that currently exist are:
* [**Historical**](Historical.html) nodes are the workhorses that handle storage and querying on "historical" data (non-realtime). Historical nodes download segments from deep storage, respond to the queries from broker nodes about these segments, and return results to the broker nodes. They announce themselves and the segments they are serving in Zookeeper, and also use Zookeeper to monitor for signals to load or drop new segments.
* [**Realtime**](Realtime.html) nodes ingest data in real time. They are in charge of listening to a stream of incoming data and making it available immediately inside the Druid system. Real-time nodes respond to query requests from Broker nodes, returning query results to those nodes. Aged data is pushed from Realtime nodes to Historical nodes.
* [**Coordinator**](Coordinator.html) nodes monitor the grouping of historical nodes to ensure that data is available, replicated and in a generally "optimal" configuration. They do this by reading segment metadata information from MySQL to determine what segments should be loaded in the cluster, using Zookeeper to determine what Historical nodes exist, and creating Zookeeper entries to tell Historical nodes to load and drop new segments.
* [**Coordinator**](Coordinator.html) nodes monitor the grouping of historical nodes to ensure that data is available, replicated and in a generally "optimal" configuration. They do this by reading segment metadata information from metadata storage to determine what segments should be loaded in the cluster, using Zookeeper to determine what Historical nodes exist, and creating Zookeeper entries to tell Historical nodes to load and drop new segments.
* [**Broker**](Broker.html) nodes receive queries from external clients and forward those queries to Realtime and Historical nodes. When Broker nodes receive results, they merge these results and return them to the caller. For knowing topology, Broker nodes use Zookeeper to determine what Realtime and Historical nodes exist.
* [**Indexer**](Indexing-Service.html) nodes form a cluster of workers to load batch and real-time data into the system as well as allow for alterations to the data stored in the system (also known as the Indexing Service).
@ -46,7 +46,7 @@ All nodes can be run in some highly available fashion, either as symmetric peers
Aside from these nodes, there are 3 external dependencies to the system:
1. A running [ZooKeeper](ZooKeeper.html) cluster for cluster service discovery and maintenance of current data topology
2. A [MySQL instance](MySQL.html) for maintenance of metadata about the data segments that should be served by the system
2. A [metadata storage instance](Metadata-storage.html) for maintenance of metadata about the data segments that should be served by the system
3. A ["deep storage" LOB store/file system](Deep-Storage.html) to hold the stored segments
The following diagram illustrates the cluster's management layer, showing how certain nodes and dependencies help manage the cluster by tracking and exchanging metadata:
@ -70,7 +70,7 @@ The output of the indexing process is stored in a "deep storage" LOB store/file
If a Historical node dies, it will no longer serve its segments, but given that the segments are still available on the "deep storage", any other node can simply download the segment and start serving it. This means that it is possible to actually remove all historical nodes from the cluster and then re-provision them without any data loss. It also means that if the "deep storage" is not available, the nodes can continue to serve the segments they have already pulled down (i.e. the cluster goes stale, not down).
In order for a segment to exist inside of the cluster, an entry has to be added to a table in a MySQL instance. This entry is a self-describing bit of metadata about the segment, including things like the schema of the segment, its size, and its location on deep storage. These entries are what the Coordinator uses to know what data **should** be available on the cluster.
In order for a segment to exist inside of the cluster, an entry has to be added to a table in a metadata storage instance. This entry is a self-describing bit of metadata about the segment, including things like the schema of the segment, its size, and its location on deep storage. These entries are what the Coordinator uses to know what data **should** be available on the cluster.
### Fault Tolerance
@ -79,7 +79,7 @@ In order for a segment to exist inside of the cluster, an entry has to be added
- **Broker** Can be run in parallel or in hot fail-over.
- **Realtime** Depending on the semantics of the delivery stream, multiple of these can be run in parallel processing the exact same stream. They periodically checkpoint to disk and eventually push out to the Historical nodes. Steps are taken to be able to recover from process death, but loss of access to the local disk can result in data loss if this is the only method of adding data to the system.
- **"deep storage" file system** If this is not available, new data will not be able to enter the cluster, but the cluster will continue operating as is.
- **MySQL** If this is not available, the Coordinator will be unable to find out about new segments in the system, but it will continue with its current view of the segments that should exist in the cluster.
- **metadata storage** If this is not available, the Coordinator will be unable to find out about new segments in the system, but it will continue with its current view of the segments that should exist in the cluster.
- **ZooKeeper** If this is not available, data topology changes cannot be made, but the Brokers will maintain their most recent view of the data topology and continue serving requests accordingly.
### Query processing

View File

@ -14,7 +14,7 @@ This guide walks you through the steps to create the cluster and then how to cre
## Whats in this Druid Demo Cluster?
1. A single "Coordinator" node. This node co-locates the [Coordinator](Coordinator.html) process, the [Broker](Broker.html) process, Zookeeper, and the MySQL instance. You can read more about Druid architecture [Design](Design.html).
1. A single "Coordinator" node. This node co-locates the [Coordinator](Coordinator.html) process, the [Broker](Broker.html) process, Zookeeper, and the metadata storage instance. You can read more about Druid architecture [Design](Design.html).
1. Three historical nodes; these historical nodes, have been pre-configured to work with the Coordinator node and should automatically load up the Wikipedia edit stream data (no specific setup is required).

View File

@ -44,7 +44,7 @@ There are additional configs for autoscaling (if it is enabled):
#### Dynamic Configuration
Overlord dynamic configuration is mainly for autoscaling. The overlord reads a worker setup spec as a JSON object from the Druid [MySQL](MySQL.html) config table. This object contains information about the version of middle managers to create, the maximum and minimum number of middle managers in the cluster at one time, and additional information required to automatically create middle managers.
Overlord dynamic configuration is mainly for autoscaling. The overlord reads a worker setup spec as a JSON object from the Druid [metadata storage](Metadata-storage.html) config table. This object contains information about the version of middle managers to create, the maximum and minimum number of middle managers in the cluster at one time, and additional information required to automatically create middle managers.
The JSON object can be submitted to the overlord via a POST request at:

View File

@ -17,7 +17,7 @@ Make sure that the `druid.publish.type` on your real-time nodes is set to `metad
```
druid.publish.type=metadata
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.connectURI=jdbc\:metadata storage\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd

158
docs/content/Ingestion.md Normal file
View File

@ -0,0 +1,158 @@
---
layout: doc_page
---
# Ingestion Spec
A Druid ingestion spec consists of 3 components:
```json
{
"dataSchema" : {...}
"ioConfig" : {...}
"tuningConfig" : {...}
}
```
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| dataSchema | JSON Object | Specifies the the schema of the incoming data. All ingestion specs can share the same dataSchema. | yes |
| ioConfig | JSON Object | Specifies where the data is coming from and where the data is going. This object will vary with the ingestion method. | yes |
| tuningConfig | JSON Object | Specifies how to tune various ingestion parameters. This object will vary with the ingestion method. | no |
# DataSchema
An example dataSchema is shown below:
```json
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
}
```
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| dataSource | String | The name of the ingested datasource. Datasources can be thought of as tables. | yes |
| parser | JSON Object | Specifies how ingested data can be parsed. | yes |
| metricsSpec | JSON Object array | A list of [aggregators](Aggregations.html). | yes |
| granularitySpec | JSON Object | Specifies how to create segments and roll up data. | yes |
## Parser
If `type` is not included, the parser defaults to `string`.
### String Parser
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `string`. | no |
| parseSpec | JSON Object | Specifies the format of the data. | yes |
### Protobuf Parser
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `protoBuf`. | no |
| parseSpec | JSON Object | Specifies the format of the data. | yes |
### ParseSpec
If `type` is not included, the parseSpec defaults to `tsv`.
#### JSON ParseSpec
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `json`. | no |
| timestampSpec | JSON Object | Specifies the column and format of the timestamp. | yes |
| dimensionsSpec | JSON Object | Specifies the dimensions of the data. | yes |
#### CSV ParseSpec
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `csv`. | no |
| timestampSpec | JSON Object | Specifies the column and format of the timestamp. | yes |
| dimensionsSpec | JSON Object | Specifies the dimensions of the data. | yes |
| listDelimiter | String | A custom delimiter for multi-value dimensions. | no (default == ctrl+A) |
| columns | JSON array | Specifies the columns of the data. | yes |
#### TSV ParseSpec
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `tsv`. | no |
| timestampSpec | JSON Object | Specifies the column and format of the timestamp. | yes |
| dimensionsSpec | JSON Object | Specifies the dimensions of the data. | yes |
| delimiter | String | A custom delimiter for data values. | no (default == \t) |
| listDelimiter | String | A custom delimiter for multi-value dimensions. | no (default == ctrl+A) |
| columns | JSON String array | Specifies the columns of the data. | yes |
### Timestamp Spec
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| column | String | The column of the timestamp. | yes |
| format | String | iso, millis, posix, auto or any [Joda time](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) format. | no (default == 'auto' |
### DimensionsSpec
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| dimensions | JSON String array | The names of the dimensions. | yes |
| dimensionExclusions | JSON String array | The names of dimensions to exclude from ingestion. | no (default == [] |
| spatialDimensions | JSON Object array | An array of [spatial dimensions](GeographicQueries.html) | no (default == [] |
## GranularitySpec
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
# IO Config
Real-time Ingestion: See [Real-time ingestion](Realtime-ingestion.html).
Batch Ingestion: See [Batch ingestion](Batch-ingestion.html)
# Ingestion Spec
Real-time Ingestion: See [Real-time ingestion](Realtime-ingestion.html).
Batch Ingestion: See [Batch ingestion](Batch-ingestion.html)

View File

@ -15,7 +15,6 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
```json
"firehose" : {
"type" : "kafka-0.8",
"consumerProps" : {
@ -28,27 +27,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed" : "druidtest",
"parser" : {
"timestampSpec" : {
"column" : "utcdt",
"format" : "iso"
},
"data" : {
"format" : "json"
},
"dimensionExclusions" : [
"wp"
]
}
"feed" : "druidtest"
},
"plumber" : {
"type" : "realtime",
"windowPeriod" : "PT10m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicyFactory": {
"type": "messageTime"
}
"type" : "realtime"
}
```
```

View File

@ -1,8 +1,20 @@
---
layout: doc_page
---
# MySQL Database
MySQL is an external dependency of Druid. We use it to store various metadata about the system, but not to store the actual data. There are a number of tables used for various purposes described below.
# Metadata Storage
The Metadata Storage is an external dependency of Druid. We use it to store various metadata about the system, but not to store the actual data. There are a number of tables used for various purposes described below.
Supported Metadata Storages
---------------------------
The following metadata storages are supported:
* Derby (default - only works if you have all processes running on the same node)
* MySQL (io.druid.extensions:mysql-metadata-storage)
* PostgreSQL (io.druid.extensions:postgresql-metadata-storage)
To choose a metadata storage to use, set the `druid.extensions` config to include the module for the metadata storage.
Segments Table
--------------

View File

@ -6,13 +6,13 @@ Production Cluster Configuration
__This configuration is an example of what a production cluster could look like. Many other hardware combinations are possible! Cheaper hardware is absolutely possible.__
This production Druid cluster assumes that MySQL and Zookeeper are already set up. The deep storage that is used for examples is S3 and memcached is used for a distributed cache.
This production Druid cluster assumes that metadata storage and Zookeeper are already set up. The deep storage that is used for examples is S3 and memcached is used for a distributed cache.
The nodes that respond to queries (Historical, Broker, and Middle manager nodes) will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Historical daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries. SSDs are highly recommended for Historical nodes not all data is loaded in available memory.
The nodes that are responsible for coordination (Coordinator and Overlord nodes) require much less processing.
The effective utilization of cores by Zookeeper, MySQL, and Coordinator nodes is likely to be between 1 and 2 for each process/daemon, so these could potentially share a machine with lots of cores. These daemons work with heap a size between 500MB and 1GB.
The effective utilization of cores by Zookeeper, metadata storage, and Coordinator nodes is likely to be between 1 and 2 for each process/daemon, so these could potentially share a machine with lots of cores. These daemons work with heap a size between 500MB and 1GB.
We'll use r3.8xlarge nodes for query facing nodes and m1.xlarge nodes for coordination nodes. The following examples work relatively well in production, however, a more optimized tuning for the nodes we selected and more optimal hardware for a Druid cluster are both definitely possible.

View File

@ -2,7 +2,6 @@
layout: doc_page
---
Realtime Data Ingestion
=======================
For general Real-time Node information, see [here](Realtime.html).
@ -11,8 +10,11 @@ For Real-time Node Configuration, see [Realtime Configuration](Realtime-Config.h
For writing your own plugins to the real-time node, see [Firehose](Firehose.html).
Much of the configuration governing Realtime nodes and the ingestion of data is set in the Realtime spec file, discussed on this page.
There are two ways of ingesting real-time data. This can be achieved with a standalone real-time node, or using the [Tranquility](https://github.com/metamx/tranquility) client library as part of the [Indexing Service](Indexing-Service.html). For a full explanation of why there are two methods, please see [this link](https://groups.google.com/forum/#!searchin/druid-development/fangjin$20yang$20%22thoughts%22/druid-development/aRMmNHQGdhI/muBGl0Xi_wgJ). If you are comfortable with the limitations of standalone real-time nodes, you can use them as they are easier to set up. The indexing service is a more robust and highly available solution but will also require more effort to set up.
## Realtime Node Ingestion
Much of the configuration governing Realtime nodes and the ingestion of data is set in the Realtime spec file, discussed on this page.
<a id="realtime-specfile"></a>
## Realtime "specFile"
@ -22,59 +24,74 @@ The property `druid.realtime.specFile` has the path of a file (absolute or relat
```json
[
{
"schema": {
"dataSource": "dataSourceName",
"aggregators": [
{
"type": "count",
"name": "events"
},
{
"type": "doubleSum",
"name": "outColumn",
"fieldName": "inColumn"
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
],
"indexGranularity": "minute",
"shardSpec": {
"type": "none"
}
},
"config": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "zk_connect_string",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "consumer-group",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "your_kafka_topic",
"parser": {
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"data": {
"format": "json"
},
"dimensionExclusions": [
"value"
]
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
},
"plumber": {
"type": "realtime",
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist"
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}
}
}
]
@ -82,59 +99,85 @@ The property `druid.realtime.specFile` has the path of a file (absolute or relat
This is a JSON Array so you can give more than one realtime stream to a given node. The number you can put in the same process depends on the exact configuration. In general, it is best to think of each realtime stream handler as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread for incremental persists and other background tasks.
There are four parts to a realtime stream specification, `schema`, `config`, `firehose` and `plumber` which we will go into here.
There are three parts to a realtime stream specification, `dataSchema`, `IOConfig`, and `tuningConfig` which we will go into here.
### DataSchema
### Schema
This field is required.
This describes the data schema for the output Druid segment. More information about concepts in Druid and querying can be found at [Concepts-and-Terminology](Concepts-and-Terminology.html) and [Querying](Querying.html).
See [Ingestion](Ingestion.html)
### IOConfig
This field is required.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes|
|dataSource|String|The name of the dataSource that the segment belongs to.|yes|
|indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no|
|type|String|This should always be 'realtime'.|yes|
|firehose|JSON Object|Where the data is coming from. Described in detail below.|yes|
|plumber|JSON Object|Where the data is going. Described in detail below.|yes|
#### Firehose
### Config
See [Firehose](Firehose.html) for more information on firehose configuration.
This provides configuration for the data processing portion of the realtime stream processor.
#### Plumber
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|yes|
|maxRowsInMemory|Number|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|yes|
|type|String|This should always be 'realtime'.|no|
### TuningConfig
### Firehose
Firehoses describe the data stream source. See [Firehose](Firehose.html) for more information on firehose configuration.
The tuningConfig is optional and default parameters will be used if no tuningConfig is specified.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'realtime'.|no|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|no (default == 5 million)|
|windowPeriod|ISO 8601 Period String|The amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.|no (default == PT10m)|
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|no (default == PT10m)|
|basePersistDirectory|String|The directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.|no (default == java tmp dir)|
|versioningPolicy|Object|How to version segments.|no (default == based on segment start time)|
|rejectionPolicy|Object|Controls how data sets the data acceptance policy for creating and handing off segments. More on this below.|no (default=='serverTime')|
|maxPendingPersists|Integer|How many persists a plumber can do concurrently without starting to block.|no (default == 0)|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no (default == 'NoneShardSpec'|
### Plumber
The Plumber handles generated segments both while they are being generated and when they are "done". The configuration parameters in the example are:
#### Rejection Policy
* `type` specifies the type of plumber in terms of configuration schema. The Plumber configuration in the example is for the often-used RealtimePlumber.
* `windowPeriod` is the amount of lag time to allow events. The example configures a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.
* `segmentGranularity` specifies the granularity of the segment, or the amount of time a segment will represent.
* `basePersistDirectory` is the directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.
* `rejectionPolicy` determines what events are rejected upon ingestion.
The following policies are available:
* `serverTime` &ndash; The recommended policy for "current time" data, it is optimal for current data that is generated and ingested in real time. Uses `windowPeriod` to accept only those events that are inside the window looking forward and back.
* `messageTime` &ndash; Can be used for non-"current time" as long as that data is relatively in sequence. Events are rejected if they are less than `windowPeriod` from the event with the latest timestamp. Hand off only occurs if an event is seen after the segmentGranularity and `windowPeriod`.
* `none` &ndash; Never hands off data unless shutdown() is called on the configured firehose.
See [Plumber](Plumber.html) for a fuller discussion of Plumber configuration.
#### Sharding Real-time Ingestion
Constraints
-----------
The `shardSpec` can be used to shard real-time ingestion. Different shardSpecs are required for different real-time nodes for the same dataSource. For example:
```json
"shardSpec" : {
"type" : "linear",
"partitionNumber" : 0
}
```
Other real-time nodes for the same dataSource will have monotonically increasing shardSpec numbers.
## Realtime Ingestion using the Indexing Service
We strongly recommend using the client library [Tranquility](https://github.com/metamx/tranquility) for this use case. Please read the documentation on the Tranquility web page.
## Constraints
The following table summarizes constraints between settings in the spec file for the Realtime subsystem.
|Name|Effect|Minimum|Recommended|
|----|------|-------|-----------|
| windowPeriod| when reading an InputRow, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers |
| segmentGranularity| time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than indexGranularity|
| indexGranularity| time granularity (minute, hour, day, week, month) of indexes | less than segmentGranularity| minute, hour, day, week, month |
| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory |
| maxRowsInMemory| the max number of InputRows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
|windowPeriod| When reading a row, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers |
|segmentGranularity| Time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than queryGranularity|
|queryGranularity| Time granularity (minute, hour, day, week, month) for rollup | less than segmentGranularity| minute, hour, day, week, month |
|intermediatePersistPeriod| The max time (ISO8601 Period) between flushes of ingested rows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory |
|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
The normal, expected use cases have the following overall constraints: `indexGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity`
The normal, expected use cases have the following overall constraints: `queryGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity`
If the Realtime Node process runs out of heap, try adjusting druid.computation.buffer.size property which specifies a size in bytes that must fit into the heap.

View File

@ -7,7 +7,7 @@ For Real-time Node Configuration, see [Realtime Configuration](Realtime-Config.h
For Real-time Ingestion, see [Realtime Ingestion](Realtime-ingestion.html).
Realtime nodes provide a realtime index. Data indexed via these nodes is immediately available for querying. Realtime nodes will periodically build segments representing the data theyve collected over some span of time and transfer these segments off to [Historical](Historical.html) nodes. They use ZooKeeper to monitor the transfer and MySQL to store metadata about the transfered segment. Once transfered, segments are forgotten by the Realtime nodes.
Realtime nodes provide a realtime index. Data indexed via these nodes is immediately available for querying. Realtime nodes will periodically build segments representing the data theyve collected over some span of time and transfer these segments off to [Historical](Historical.html) nodes. They use ZooKeeper to monitor the transfer and the metadata storage to store metadata about the transferred segment. Once transfered, segments are forgotten by the Realtime nodes.
### Running

View File

@ -133,4 +133,4 @@ druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
```
This simple broker will run groupBys in a single thread.
This simple broker will run groupBys in a single thread.

View File

@ -58,7 +58,7 @@ This just walks through getting the relevant software installed and running. Yo
1. Install necessary software with yum
yum install -y java-1.7.0-openjdk-devel git wget mysql-server
yum install -y java-1.7.0-openjdk-devel git wget metadata storage-server
1. Install maven
@ -204,11 +204,11 @@ This just walks through getting the relevant software installed and running. Yo
s3cmd ls
1. Start MySQL server
1. Start metadata storage server
service mysqld start
chkconfig mysqld on
/usr/bin/mysqladmin -u root password 'riakdruid'
service metadata storaged start
chkconfig metadata storaged on
/usr/bin/metadata storageadmin -u root password 'riakdruid'
NOTE: If you don't like "riakdruid" as your password, feel free to change it around.
NOTE: If you have used root user to connect to database. It should be changed by other user but I have used this one to simplify it

View File

@ -51,13 +51,13 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
You can extract the content within by issuing:
```
tar -zxvf druid-services-*-bin.tar.gz
```
Not too lost so far right? That's great! If you cd into the directory:
If you cd into the directory:
```
cd druid-services-0.6.160
@ -69,24 +69,6 @@ You should see a bunch of files:
* run_example_client.sh
* LICENSE, config, examples, lib directories
Setting up Zookeeper
--------------------
Before we get started, we need to start Apache Zookeeper.
```bash
Download zookeeper from [http://www.apache.org/dyn/closer.cgi/zookeeper/](http://www.apache.org/dyn/closer.cgi/zookeeper/)
Install zookeeper.
e.g.
curl http://www.gtlib.gatech.edu/pub/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz -o zookeeper-3.4.6.tar.gz
tar xzf zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6
cp conf/zoo_sample.cfg conf/zoo.cfg
./bin/zkServer.sh start
cd ..
```
Running Example Scripts
-----------------------
@ -109,83 +91,29 @@ Note that the first time you start the example, it may take some extra time due
2013-09-04 19:33:11,946 INFO [ApiDaemon] io.druid.segment.realtime.firehose.IrcFirehoseFactory - Joining channel #ja.wikipedia
```
The Druid real time-node ingests events in an in-memory buffer. Periodically, these events will be persisted to disk. If you are interested in the details of our real-time architecture and why we persist indexes to disk, I suggest you read our [White Paper](http://static.druid.io/docs/druid.pdf).
The Druid real time-node ingests events in an in-memory buffer. Periodically, these events will be persisted to disk. If you are interested in the details of our real-time architecture and why we persist indexes to disk, we suggest you read our [White Paper](http://static.druid.io/docs/druid.pdf).
Okay, things are about to get real-time. To query the real-time node you've spun up, you can issue:
To query the real-time node you've spun up, you can issue:
```
./run_example_client.sh
```
Select "wikipedia" once again. This script issues [GroupByQueries](GroupByQuery.html) to the data we've been ingesting. The query looks like this:
Select "wikipedia" once again. This script issues [TimeBoundary](TimeBoundaryQuery.html) to the data we've been ingesting. The query looks like this:
```json
{
"queryType":"groupBy",
"dataSource":"wikipedia",
"granularity":"minute",
"dimensions":[ "page" ],
"aggregations":[
{"type":"count", "name":"rows"},
{"type":"longSum", "fieldName":"count", "name":"edit_count"}
],
"filter":{ "type":"selector", "dimension":"namespace", "value":"article" },
"intervals":[ "2013-06-01T00:00/2020-01-01T00" ]
"queryType":"timeBoundary",
"dataSource":"wikipedia"
}
```
This is a **groupBy** query, which you may be familiar with from SQL. We are grouping, or aggregating, via the `dimensions` field: `["page"]`. We are **filtering** via the `namespace` dimension, to only look at edits on `articles`. Our **aggregations** are what we are calculating: a count of the number of data rows, and a count of the number of edits that have occurred.
The **timeBoundary** query is one of the simplest queries you can make in Druid. It gives you the boundaries of the ingested data.
We are **filtering** via the `namespace` dimension, to only look at edits on `articles`. Our **aggregations** are what we are calculating: a count of the number of data rows, and a count of the number of edits that have occurred.
The result looks something like this (when it's prettified):
```json
[
{
"version": "v1",
"timestamp": "2013-09-04T21:44:00.000Z",
"event": { "count": 0, "page": "2013\u201314_Brentford_F.C._season", "rows": 1 }
},
{
"version": "v1",
"timestamp": "2013-09-04T21:44:00.000Z",
"event": { "count": 0, "page": "8e_\u00e9tape_du_Tour_de_France_2013", "rows": 1 }
},
{
"version": "v1",
"timestamp": "2013-09-04T21:44:00.000Z",
"event": { "count": 0, "page": "Agenda_of_the_Tea_Party_movement", "rows": 1 }
},
...
```
This groupBy query is a bit complicated and we'll return to it later. For the time being, just make sure you are getting some blocks of data back. If you are having problems, make sure you have [curl](http://curl.haxx.se/) installed. Control+C to break out of the client script.
Querying Druid
--------------
In your favorite editor, create the file:
```
time_boundary_query.body
```
Druid queries are JSON blobs which are relatively painless to create programmatically, but an absolute pain to write by hand. So anyway, we are going to create a Druid query by hand. Add the following to the file you just created:
```json
{
"queryType": "timeBoundary",
"dataSource": "wikipedia"
}
```
The [TimeBoundaryQuery](TimeBoundaryQuery.html) is one of the simplest Druid queries. To run the query, you can issue:
```
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @time_boundary_query.body
```
We get something like this JSON back:
```json
[ {
"timestamp" : "2013-09-04T21:44:00.000Z",
@ -196,12 +124,15 @@ We get something like this JSON back:
} ]
```
As you can probably tell, the result is indicating the maximum and minimum timestamps we've seen thus far (summarized to a minutely granularity). Let's explore a bit further.
If you are having problems with getting results back, make sure you have [curl](http://curl.haxx.se/) installed. Control+C to break out of the client script.
Return to your favorite editor and create the file:
Querying Druid
--------------
In your favorite editor, create the file:
```
timeseries_query.body
timeseries.json
```
We are going to make a slightly more complicated query, the [TimeseriesQuery](TimeseriesQuery.html). Copy and paste the following into the file:
@ -219,11 +150,11 @@ We are going to make a slightly more complicated query, the [TimeseriesQuery](Ti
}
```
You are probably wondering, what are these [Granularities](Granularities.html) and [Aggregations](Aggregations.html) things? What the query is doing is aggregating some metrics over some span of time.
Our query has now expanded to include a time interval, [Granularities](Granularities.html), and [Aggregations](Aggregations.html). What the query is doing is aggregating a set metrics over a span of time, and the results are put into a single bucket.
To issue the query and get some results, run the following in your command line:
```
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @timeseries_query.body
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @timeseries.json
```
Once again, you should get a JSON blob of text back with your results, that looks something like this:
@ -237,9 +168,9 @@ Once again, you should get a JSON blob of text back with your results, that look
If you issue the query again, you should notice your results updating.
Right now all the results you are getting back are being aggregated into a single timestamp bucket. What if we wanted to see our aggregations on a per minute basis? What field can we change in the query to accomplish this?
Right now all the results you are getting back are being aggregated into a single timestamp bucket. What if we wanted to see our aggregations on a per minute basis?
If you loudly exclaimed "we can change granularity to minute", you are absolutely correct! We can specify different granularities to bucket our results, like so:
We can change granularity our the results to minute. To specify different granularities to bucket our results, we change our schema like so:
```json
{
@ -254,7 +185,7 @@ If you loudly exclaimed "we can change granularity to minute", you are absolutel
}
```
This gives us something like the following:
This gives us results like the following:
```json
[
@ -277,27 +208,24 @@ This gives us something like the following:
Solving a Problem
-----------------
One of Druid's main powers is to provide answers to problems, so let's pose a problem. What if we wanted to know what the top pages in the US are, ordered by the number of edits over the last few minutes you've been going through this tutorial? To solve this problem, we have to return to the query we introduced at the very beginning of this tutorial, the [GroupByQuery](GroupByQuery.html). It would be nice if we could group by results by dimension value and somehow sort those results... and it turns out we can!
One of Druid's main powers is to provide answers to problems, so let's pose a problem. What if we wanted to know what the top pages in the US are, ordered by the number of edits over the last few minutes you've been going through this tutorial? To solve this problem, we can use the [TopN](TopNQuery.html).
Let's create the file:
```
group_by_query.body
topn.json
```
and put the following in there:
```json
{
"queryType": "groupBy",
"queryType": "topN",
"dataSource": "wikipedia",
"granularity": "all",
"dimensions": [ "page" ],
"limitSpec": {
"type": "default",
"columns": [ { "dimension": "edit_count", "direction": "DESCENDING" } ],
"limit": 10
},
"dimension": "page",
"metric": "edit_count",
"threshold" : 10,
"aggregations": [
{"type": "longSum", "fieldName": "count", "name": "edit_count"}
],
@ -306,12 +234,12 @@ and put the following in there:
}
```
Woah! Our query just got a way more complicated. Now we have these [Filters](Filters.html) things and this [LimitSpec](LimitSpec.html) thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
Note that our query now includes [Filters](Filters.html).
If you issue the query:
```
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @group_by_query.body
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @topn.json
```
You should see an answer to our question. As an example, some results are shown below:
@ -319,21 +247,15 @@ You should see an answer to our question. As an example, some results are shown
```json
[
{
"version" : "v1",
"timestamp" : "2012-10-01T00:00:00.000Z",
"event" : { "page" : "RTC_Transit", "edit_count" : 6 }
},
{
"version" : "v1",
"timestamp" : "2012-10-01T00:00:00.000Z",
"event" : { "page" : "List_of_Deadly_Women_episodes", "edit_count" : 4 }
},
{
"version" : "v1",
"timestamp" : "2012-10-01T00:00:00.000Z",
"event" : { "page" : "User_talk:David_Biddulph", "edit_count" : 4 }
},
...
"timestamp" : "2013-09-04T21:00:00.000Z",
"result" : [
{ "page" : "RTC_Transit", "edit_count" : 6 },
{ "page" : "List_of_Deadly_Women_episodes", "edit_count" : 4 },
{ "page" : "User_talk:David_Biddulph", "edit_count" : 4 },
...
]
}
]
```
Feel free to tweak other query parameters to answer other questions you may have about the data.
@ -348,6 +270,6 @@ Druid is even more fun if you load your own data into it! To learn how to load y
Additional Information
----------------------
This tutorial is merely showcasing a small fraction of what Druid can do. If you are interested in more information about Druid, including setting up a more sophisticated Druid cluster, read more of the Druid documentation and the blogs found on druid.io.
This tutorial is merely showcasing a small fraction of what Druid can do. If you are interested in more information about Druid, including setting up a more sophisticated Druid cluster, read more of the Druid documentation and blogs found on druid.io.
And thus concludes our journey! Hopefully you learned a thing or two about Druid real-time ingestion, querying Druid, and how Druid can be used to solve problems. If you have additional questions, feel free to post in our [google groups page](https://groups.google.com/forum/#!forum/druid-development).
Hopefully you learned a thing or two about Druid real-time ingestion, querying Druid, and how Druid can be used to solve problems. If you have additional questions, feel free to post in our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -14,7 +14,7 @@ Before we start digging into how to query Druid, make sure you've gone through t
Let's start up a simple Druid cluster so we can query all the things.
Note: If Zookeeper and MySQL aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
Note: If Zookeeper and metadata storage aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
To start a Coordinator node:

View File

@ -2,17 +2,13 @@
layout: doc_page
---
# Tutorial: Loading Your Data (Part 1)
In our last [tutorial](Tutorial%3A-The-Druid-Cluster.html), we set up a complete Druid cluster. We created all the Druid dependencies and loaded some batched data. Druid shards data into self-contained chunks known as [segments](Segments.html). Segments are the fundamental unit of storage in Druid and all Druid nodes only understand segments.
# Tutorial: Loading Batch Data
In this tutorial, we will learn about batch ingestion (as opposed to real-time ingestion) and how to create segments using the final piece of the Druid Cluster, the [indexing service](Indexing-Service.html). The indexing service is a standalone service that accepts [tasks](Tasks.html) in the form of POST requests. The output of most tasks are segments.
If you are interested more about ingesting your own data into Druid, skip to the next [tutorial](Tutorial%3A-Loading-Your-Data-Part-2.html).
About the data
--------------
The data source we'll be working with is Wikipedia edits once again. The data schema is the same as the previous tutorials:
The Data
--------
The data source we'll be using is Wikipedia edits. The data schema is:
Dimensions (things to filter on):
@ -39,20 +35,22 @@ Metrics (things to aggregate over):
"delta"
"deleted"
```
Setting Up
----------
At this point, you should already have Druid downloaded and are comfortable with running a Druid cluster locally. If you are not, see [here](Tutorial%3A-The-Druid-Cluster.html).
Batch Ingestion
---------------
Druid is designed for large data volumes, and most real-world data sets require batch indexing be done through a Hadoop job.
Let's start from our usual starting point in the tarball directory.
For this tutorial, we used [Hadoop 1.0.3](https://archive.apache.org/dist/hadoop/core/hadoop-1.0.3/). There are many pages on the Internet showing how to set up a single-node (standalone) Hadoop cluster, which is all that's needed for this example.
Segments require data, so before we can build a Druid segment, we are going to need some raw data. Make sure that the following file exists:
For the purposes of this tutorial, we are going to use our very small and simple Wikipedia data set. This data can directly be ingested via other means as shown in the previous [tutorial](Tutorial%3A-Loading-Your-Data-Part-1.html), but we are going to use Hadoop here for demonstration purposes.
Our data is located at:
```
examples/indexing/wikipedia_data.json
```
Open the file and make sure the following events exist:
The following events should exist in the file:
```json
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
@ -62,72 +60,35 @@ Open the file and make sure the following events exist:
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
```
There are five data points spread across the day of 2013-08-31. Talk about big data right? Thankfully, we don't need a ton of data to introduce how batch ingestion works.
#### Set Up a Druid Cluster
In order to ingest and query this data, we are going to need to run a historical node, a coordinator node, and an indexing service to run the batch ingestion.
To index the data, we are going to need an indexing service, a historical node, and a coordinator node.
Note: If Zookeeper and MySQL aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
#### Starting a Local Indexing Service
The simplest indexing service we can start up is to run an [overlord](Indexing-Service.html) node in local mode. You can do so by issuing:
To start the Indexing Service:
```bash
java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/overlord io.druid.cli.Main server overlord
java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:<hadoop_config_path>:config/overlord io.druid.cli.Main server overlord
```
The overlord configurations should already exist in:
```
config/overlord/runtime.properties
```
The configurations for the overlord node are as follows:
```bash
druid.host=localhost
druid.port=8087
druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx256m"
druid.indexer.fork.property.druid.processing.numThreads=1
druid.indexer.fork.property.druid.computation.buffer.size=100000000
```
If you are interested in reading more about these configurations, see [here](Indexing-Service.html).
#### Starting Other Nodes
Just in case you forgot how, let's start up the other nodes we require:
Coordinator node:
To start the Coordinator Node:
```bash
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/coordinator io.druid.cli.Main server coordinator
```
Historical node:
To start the Historical Node:
```bash
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/historical io.druid.cli.Main server historical
```
Note: Historical, real-time and broker nodes share the same query interface. Hence, we do not explicitly need a broker node for this tutorial. All queries can go against the historical node directly.
#### Index the Data
Once all the nodes are up and running, we are ready to index some data.
There are two ways we can load the data, depending on the data volume. The simplest method of loading data is to use the [Index Task](Tasks.html). Index tasks can load batch data without any external dependencies. They are however, slow when the data volume exceeds 1G.
Indexing the Data
-----------------
#### Index Task
To index the data and build a Druid segment, we are going to need to submit a task to the indexing service. This task should already exist:
@ -140,40 +101,64 @@ Open up the file to see the following:
```json
{
"type" : "index",
"dataSource" : "wikipedia",
"granularitySpec" : {
"type" : "uniform",
"gran" : "DAY",
"intervals" : [ "2013-08-31/2013-09-01" ]
},
"aggregators" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"firehose" : {
"type" : "local",
"baseDir" : "examples/indexing",
"filter" : "wikipedia_data.json",
"parser" : {
"timestampSpec" : {
"column" : "timestamp"
"schema" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"data" : {
"format" : "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "index",
"firehose" : {
"type" : "local",
"baseDir" : "examples/indexing/",
"filter" : "wikipedia_data.json"
}
},
"tuningConfig" : {
"type" : "index",
"targetPartitionSize" : 0,
"rowFlushBoundary" : 0
}
}
}
@ -262,12 +247,89 @@ Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html)
Most common data ingestion problems are around timestamp formats and other malformed data issues.
#### Hadoop Index Task
Before indexing the data, make sure you have a valid Hadoop cluster running. To build our Druid segment, we are going to submit a [Hadoop index task](Tasks.html) to the indexing service. The grammar for the Hadoop index task is very similar to the index task of the last tutorial. The tutorial Hadoop index task should be located at:
```
examples/indexing/wikipedia_index_hadoop_task.json
```
Examining the contents of the file, you should find:
```json
{
"type" : "index_hadoop",
"schema" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"paths" : "/MyDirectory/examples/indexing/wikipedia_data.json"
}
}
}
}
```
If you are curious about what all this configuration means, see [here](Tasks.html).
To submit the task:
```bash
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/indexing/wikipedia_index_hadoop_task.json localhost:8087/druid/indexer/v1/task
```
After the task is completed, the segment should be assigned to your historical node. You should be able to query the segment.
Next Steps
----------
This tutorial covered ingesting a small batch data set and loading it into Druid. In [Loading Your Data Part 2](Tutorial%3A-Loading-Your-Data-Part-2.html), we will cover how to ingest data using Hadoop for larger data sets.
Note: The index task and local firehose can be used to ingest your own data if the size of that data is relatively small (< 1G). The index task is fairly slow and we highly recommend using the Hadoop Index Task for ingesting larger quantities of data.
We demonstrated using the indexing service as a way to ingest data into Druid. Previous versions of Druid used the [HadoopDruidIndexer](Batch-ingestion.html) to ingest batch data. The `HadoopDruidIndexer` still remains a valid option for batch ingestion, however, we recommend using the indexing service as the preferred method of getting batch data into Druid.
Additional Information
----------------------

View File

@ -0,0 +1,232 @@
---
layout: doc_page
---
# Tutorial: Loading Streaming Data
In our last [tutorial](Tutorial%3A-The-Druid-Cluster.html), we set up a complete Druid cluster. We created all the Druid dependencies and ingested streaming data. In this tutorial, we will expand upon what we've done in the first two tutorials.
About the data
--------------
The data source we'll be working with is Wikipedia edits once again. The data schema is the same as the previous tutorials:
Dimensions (things to filter on):
```json
"page"
"language"
"user"
"unpatrolled"
"newPage"
"robot"
"anonymous"
"namespace"
"continent"
"country"
"region"
"city"
```
Metrics (things to aggregate over):
```json
"count"
"added"
"delta"
"deleted"
```
Setting Up
----------
At this point, you should already have Druid downloaded and are comfortable with running a Druid cluster locally. If you are not, see [here](Tutorial%3A-The-Druid-Cluster.html). If Zookeeper and MySQL aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
With real-world data, we recommend having a message bus such as [Apache Kafka](http://kafka.apache.org/) sit between the data stream and the real-time node. The message bus provides higher availability for production environments. [Firehoses](Firehose.html) are the key abstraction for real-time ingestion.
<a id="set-up-kafka"></a>
#### Setting up Kafka
[KafkaFirehoseFactory](Firehose.html) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
The following quick-start instructions for booting a Zookeeper and then Kafka cluster were taken from the [Kafka website](http://kafka.apache.org/07/quickstart.html).
1. Download Apache Kafka 0.7.2 from [http://kafka.apache.org/downloads.html](http://kafka.apache.org/downloads.html)
```bash
wget http://archive.apache.org/dist/kafka/old_releases/kafka-0.7.2-incubating/kafka-0.7.2-incubating-src.tgz
tar -xvzf kafka-0.7.2-incubating-src.tgz
cd kafka-0.7.2-incubating-src
```
2. Build Kafka
```bash
./sbt update
./sbt package
```
3. Boot Kafka
```bash
cat config/zookeeper.properties
bin/zookeeper-server-start.sh config/zookeeper.properties
# in a new console
bin/kafka-server-start.sh config/server.properties
```
4. Launch the console producer (so you can type in JSON kafka messages in a bit)
```bash
bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic wikipedia
```
When things are ready, you should see log messages such as:
```
[2013-10-09 22:03:07,802] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
```
#### Launch a Realtime Node
You should be comfortable starting Druid nodes at this point. If not, it may be worthwhile to revisit the first few tutorials.
1. Real-time nodes can be started with:
```bash
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath lib/*:config/realtime io.druid.cli.Main server realtime
```
2. A realtime.spec should already exist for the data source in the Druid tarball. You should be able to find it at:
```bash
examples/indexing/wikipedia.spec
```
The contents of the file should match:
```json
[
{
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
}
}
}
]
```
Note: This config uses a "test" [rejection policy](Plumber.html) which will accept all events and timely hand off, however, we strongly recommend you do not use this in production. Using this rejection policy, segments for events for the same time range will be overridden.
3. Let's copy and paste some data into the Kafka console producer
```json
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
```
Disclaimer: We recognize the timestamps of these events aren't actually recent.
5. Watch the events as they are ingested by Druid's real-time node:
```bash
...
2013-10-10 05:13:18,976 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T01:00:00.000Z_2013-08-31T02:00:00.000Z_2013-08-31T01:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:18,992 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T03:00:00.000Z_2013-08-31T04:00:00.000Z_2013-08-31T03:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:18,997 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T07:00:00.000Z_2013-08-31T08:00:00.000Z_2013-08-31T07:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:19,003 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T11:00:00.000Z_2013-08-31T12:00:00.000Z_2013-08-31T11:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:19,008 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T12:00:00.000Z_2013-08-31T13:00:00.000Z_2013-08-31T12:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
...
```
Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) to the real-time node should yield valid results:
```json
[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-08-31T12:41:27.000Z"
}
}
]
```
#### Advanced Streaming Ingestion
Druid offers an additional method of ingesting streaming data via the indexing service. You may be wondering why a second method is needed. Standalone real-time nodes are sufficient for certain volumes of data and availability tolerances. They pull data from a message queue like Kafka or Rabbit, index data locally, and periodically finalize segments for handoff to historical nodes. They are fairly straightforward to scale, simply taking advantage of the innate scalability of the backing message queue. But they are difficult to make highly available with Kafka, the most popular supported message queue, because its high-level consumer doesnt provide a way to scale out two replicated consumer groups such that each one gets the same data in the same shard. They also become difficult to manage once you have a lot of them, since every machine needs a unique configuration. Druid solved the availability problem by switching from a pull-based model to a push-based model; rather than Druid indexers pulling data from Kafka, another process pulls data and pushes the data into Druid. Since with the push based model, we can ensure that the same data makes it into the same shard, we can replicate data. The indexing service encapsulates this functionality, where a task-and-resources model replaces a standalone machine model. In addition to simplifying machine configuration, the model also allows nodes to run in the cloud with an elastic number of machines. If you are interested in this form of real-time ingestion, please check out the client library [Tranquility](https://github.com/metamx/tranquility).
Additional Information
----------------------
Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -1,343 +0,0 @@
---
layout: doc_page
---
# Tutorial: Loading Your Data (Part 2)
In this tutorial we will cover more advanced/real-world ingestion topics.
Druid can ingest streaming or batch data. Streaming data is ingested via the real-time node, and batch data is ingested via the Hadoop batch indexer. Druid also has a standalone ingestion service called the [indexing service](Indexing-Service.html).
The Data
--------
The data source we'll be using is (surprise!) Wikipedia edits. The data schema is still:
Dimensions (things to filter on):
```json
"page"
"language"
"user"
"unpatrolled"
"newPage"
"robot"
"anonymous"
"namespace"
"continent"
"country"
"region"
"city"
```
Metrics (things to aggregate over):
```json
"count"
"added"
"delta"
"deleted"
```
Streaming Event Ingestion
-------------------------
With real-world data, we recommend having a message bus such as [Apache Kafka](http://kafka.apache.org/) sit between the data stream and the real-time node. The message bus provides higher availability for production environments. [Firehoses](Firehose.html) are the key abstraction for real-time ingestion.
<a id="set-up-kafka"></a>
#### Setting up Kafka
[KafkaFirehoseFactory](Firehose.html) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
The following quick-start instructions for booting a Zookeeper and then Kafka cluster were taken from the [Kafka website](http://kafka.apache.org/07/quickstart.html).
1. Download Apache Kafka 0.7.2 from [http://kafka.apache.org/downloads.html](http://kafka.apache.org/downloads.html)
```bash
wget http://archive.apache.org/dist/kafka/old_releases/kafka-0.7.2-incubating/kafka-0.7.2-incubating-src.tgz
tar -xvzf kafka-0.7.2-incubating-src.tgz
cd kafka-0.7.2-incubating-src
```
2. Build Kafka
```bash
./sbt update
./sbt package
```
3. Boot Kafka
```bash
cat config/zookeeper.properties
bin/zookeeper-server-start.sh config/zookeeper.properties
# in a new console
bin/kafka-server-start.sh config/server.properties
```
4. Launch the console producer (so you can type in JSON kafka messages in a bit)
```bash
bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic wikipedia
```
When things are ready, you should see log messages such as:
```
[2013-10-09 22:03:07,802] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
```
#### Launch a Realtime Node
You should be comfortable starting Druid nodes at this point. If not, it may be worthwhile to revisit the first few tutorials.
1. Real-time nodes can be started with:
```bash
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath lib/*:config/realtime io.druid.cli.Main server realtime
```
2. A realtime.spec should already exist for the data source in the Druid tarball. You should be able to find it at:
```bash
examples/indexing/wikipedia.spec
```
The contents of the file should match:
```json
[
{
"schema": {
"dataSource": "wikipedia",
"aggregators" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"indexGranularity": "none"
},
"config": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "localhost:2181",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "druid-example",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "wikipedia",
"parser": {
"timestampSpec": {
"column": "timestamp"
},
"data": {
"format": "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
}
}
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "test"
}
}
}
]
```
Note: This config uses a "test" [rejection policy](Plumber.html) which will accept all events and timely hand off, however, we strongly recommend you do not use this in production. Using this rejection policy, segments for events for the same time range will be overridden.
3. Let's copy and paste some data into the Kafka console producer
```json
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
```
Disclaimer: We recognize the timestamps of these events aren't actually recent.
5. Watch the events as they are ingested by Druid's real-time node:
```bash
...
2013-10-10 05:13:18,976 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T01:00:00.000Z_2013-08-31T02:00:00.000Z_2013-08-31T01:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:18,992 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T03:00:00.000Z_2013-08-31T04:00:00.000Z_2013-08-31T03:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:18,997 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T07:00:00.000Z_2013-08-31T08:00:00.000Z_2013-08-31T07:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:19,003 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T11:00:00.000Z_2013-08-31T12:00:00.000Z_2013-08-31T11:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
2013-10-10 05:13:19,008 INFO [chief-wikipedia] io.druid.server.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-31T12:00:00.000Z_2013-08-31T13:00:00.000Z_2013-08-31T12:00:00.000Z] at path[/druid/segments/localhost:8083/2013-10-10T05:13:18.972Z0]
...
```
Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) to the real-time node should yield valid results:
```json
[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-08-31T12:41:27.000Z"
}
}
]
```
Batch Ingestion
---------------
Druid is designed for large data volumes, and most real-world data sets require batch indexing be done through a Hadoop job.
For this tutorial, we used [Hadoop 1.0.3](https://archive.apache.org/dist/hadoop/core/hadoop-1.0.3/). There are many pages on the Internet showing how to set up a single-node (standalone) Hadoop cluster, which is all that's needed for this example.
For the purposes of this tutorial, we are going to use our very small and simple Wikipedia data set. This data can directly be ingested via other means as shown in the previous [tutorial](Tutorial%3A-Loading-Your-Data-Part-1.html), but we are going to use Hadoop here for demonstration purposes.
Our data is located at:
```
examples/indexing/wikipedia_data.json
```
The following events should exist in the file:
```json
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
```
#### Set Up a Druid Cluster
To index the data, we are going to need an indexing service, a historical node, and a coordinator node.
Note: If Zookeeper and MySQL aren't running, you'll have to start them again as described in [The Druid Cluster](Tutorial%3A-The-Druid-Cluster.html).
To start the Indexing Service:
```bash
java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:<hadoop_config_path>:config/overlord io.druid.cli.Main server overlord
```
To start the Coordinator Node:
```bash
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/coordinator io.druid.cli.Main server coordinator
```
To start the Historical Node:
```bash
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/historical io.druid.cli.Main server historical
```
#### Index the Data
Before indexing the data, make sure you have a valid Hadoop cluster running. To build our Druid segment, we are going to submit a [Hadoop index task](Tasks.html) to the indexing service. The grammar for the Hadoop index task is very similar to the index task of the last tutorial. The tutorial Hadoop index task should be located at:
```
examples/indexing/wikipedia_index_hadoop_task.json
```
Examining the contents of the file, you should find:
```json
{
"type" : "index_hadoop",
"config": {
"dataSource" : "wikipedia",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dataSpec" : {
"format" : "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
},
"granularitySpec" : {
"type" : "uniform",
"gran" : "DAY",
"intervals" : [ "2013-08-31/2013-09-01" ]
},
"pathSpec" : {
"type" : "static",
"paths" : "examples/indexing/wikipedia_data.json"
},
"targetPartitionSize" : 5000000,
"rollupSpec" : {
"aggs": [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"rollupGranularity" : "none"
}
}
}
```
If you are curious about what all this configuration means, see [here](Tasks.html).
To submit the task:
```bash
curl -X 'POST' -H 'Content-Type:application/json' -d @examples/indexing/wikipedia_index_hadoop_task.json localhost:8087/druid/indexer/v1/task
```
After the task is completed, the segment should be assigned to your historical node. You should be able to query the segment.
Next Steps
----------
We demonstrated using the indexing service as a way to ingest data into Druid. Previous versions of Druid used the [HadoopDruidIndexer](Batch-ingestion.html) to ingest batch data. The `HadoopDruidIndexer` still remains a valid option for batch ingestion, however, we recommend using the indexing service as the preferred method of getting batch data into Druid.
For more information on querying, check out this [tutorial](Tutorial%3A-All-About-Queries.html).
Additional Information
----------------------
Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -3,7 +3,7 @@ layout: doc_page
---
# Tutorial: The Druid Cluster
Welcome back! In our first [tutorial](Tutorial%3A-A-First-Look-at-Druid.html), we introduced you to the most basic Druid setup: a single realtime node. We streamed in some data and queried it. Realtime nodes collect very recent data and periodically hand that data off to the rest of the Druid cluster. Some questions about the architecture must naturally come to mind. What does the rest of Druid cluster look like? How does Druid load available static data?
Welcome back! In our first [tutorial](Tutorial%3A-A-First-Look-at-Druid.html), we introduced you to the most basic Druid setup: a single realtime node. We streamed in some data and queried it. Realtime nodes collect very recent data and periodically hand that data off to the rest of the Druid cluster. Some questions about the architecture must naturally come to mind. What does the rest of Druid cluster look like?
This tutorial will hopefully answer these questions!
@ -28,9 +28,9 @@ You can also [Build From Source](Build-from-source.html).
Druid requires 3 external dependencies. A "deep" storage that acts as a backup data repository, a relational database such as MySQL to hold configuration and metadata information, and [Apache Zookeeper](http://zookeeper.apache.org/) for coordination among different pieces of the cluster.
For deep storage, we have made a public S3 bucket (static.druid.io) available where data for this particular tutorial can be downloaded. More on the data later.
For deep storage, we will use local disk in this tutorial.
#### Setting up MySQL
#### Set up Metadata storage
1. If you don't already have it, download MySQL Community Server here: [http://dev.mysql.com/downloads/mysql/](http://dev.mysql.com/downloads/mysql/).
2. Install MySQL.
@ -45,7 +45,7 @@ GRANT ALL ON druid.* TO 'druid'@'localhost' IDENTIFIED BY 'diurd';
CREATE database druid;
```
#### Setting up Zookeeper
#### Set up Zookeeper
```bash
Download zookeeper from [http://www.apache.org/dyn/closer.cgi/zookeeper/](http://www.apache.org/dyn/closer.cgi/zookeeper/)
@ -92,7 +92,7 @@ Metrics (things to aggregate over):
## The Cluster
Let's start up a few nodes and download our data. First, let's make sure we have configs in the config directory for our various nodes. Issue the following from the Druid home directory:
Before we get started, let's make sure we have configs in the config directory for our various nodes. Issue the following from the Druid home directory:
```
ls config
@ -100,6 +100,49 @@ ls config
If you are interested in learning more about Druid configuration files, check out this [link](Configuration.html). Many aspects of Druid are customizable. For the purposes of this tutorial, we are going to use default values for most things.
#### Common Configuration
There are a couple of cluster wide configuration options we have to define. The common/cluster configuration files should exist under:
```
config/_common
```
In the directory, there should be a `common.runtime.properties` file with the following contents:
```
# Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-seven","io.druid.extensions:mysql-metadata-storage"]
# Zookeeper
druid.zk.service.host=localhost
# Metadata Storage
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
# Deep storage
druid.storage.type=local
druid.storage.storage.storageDirectory=/tmp/druid/localStorage
# Cache (we use a simple 10mb heap-based local cache on the broker)
druid.cache.type=local
druid.cache.sizeInBytes=10000000
# Indexing service discovery
druid.selectors.indexing.serviceName=overlord
# Monitoring (disabled for examples)
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
# Metrics logging (disabled for examples)
druid.emitter=noop
```
In this file we define our external dependencies and cluster wide configs.
#### Start a Coordinator Node
Coordinator nodes are in charge of load assignment and distribution. Coordinator nodes monitor the status of the cluster and command historical nodes to assign and drop segments.
@ -115,15 +158,11 @@ In the directory, there should be a `runtime.properties` file with the following
```
druid.host=localhost
druid.service=coordinator
druid.port=8082
druid.service=coordinator
druid.zk.service.host=localhost
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
# The coordinator begins assignment operations after the start delay.
# We override the default here to start things up faster for examples.
druid.coordinator.startDelay=PT70s
```
@ -135,7 +174,7 @@ java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/
#### Start a Historical Node
Historical nodes are the workhorses of a cluster and are in charge of loading historical segments and making them available for queries. Our Wikipedia segment will be downloaded by a historical node.
Historical nodes are the workhorses of a cluster and are in charge of loading historical segments and making them available for queries. Realtime nodes hand off segments to historical nodes.
For more information about Historical nodes, see [here](Historical.html).
The historical config file should exist at:
@ -148,24 +187,16 @@ In the directory we just created, we should have the file `runtime.properties` w
```
druid.host=localhost
druid.service=historical
druid.port=8081
druid.service=historical
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.server.maxSize=10000000000
# Change these to make Druid faster
# We can only 1 scan segment in parallel with these configs.
# Our intermediate buffer is also very small so longer topNs will be slow.
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
druid.server.maxSize=10000000000
```
To start the historical node:
@ -189,10 +220,15 @@ In the directory, there should be a `runtime.properties` file with the following
```
druid.host=localhost
druid.service=broker
druid.port=8080
druid.service=broker
druid.zk.service.host=localhost
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
# Bump these up only for faster nested groupBy
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
```
To start the broker node:
@ -201,33 +237,45 @@ To start the broker node:
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/broker io.druid.cli.Main server broker
```
## Loading the Data
#### Start a Realtime Node
The MySQL dependency we introduced earlier on contains a 'segments' table that contains entries for segments that should be loaded into our cluster. The Druid coordinator compares this table with segments that already exist in the cluster to determine what should be loaded and dropped. To load our wikipedia segment, we need to create an entry in our MySQL segment table.
Our goal is to ingest some data and hand-off that data to the rest of our Druid cluster. To accomplish this goal, we need to make some small configuration changes.
Usually, when new segments are created, these MySQL entries are created directly so you never have to do this by hand. For this tutorial, we can do this manually by going back into MySQL and issuing:
``` sql
use druid;
INSERT INTO druid_segments (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES ('wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z', 'wikipedia', '2013-08-08T21:26:23.799Z', '2013-08-01T00:00:00.000Z', '2013-08-02T00:00:00.000Z', '0', '2013-08-08T21:22:48.989Z', '1', '{\"dataSource\":\"wikipedia\",\"interval\":\"2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z\",\"version\":\"2013-08-08T21:22:48.989Z\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia/20130801T000000.000Z_20130802T000000.000Z/2013-08-08T21_22_48.989Z/0/index.zip\"},\"dimensions\":\"dma_code,continent_code,geo,area_code,robot,country_name,network,city,namespace,anonymous,unpatrolled,page,postal_code,language,newpage,user,region_lookup\",\"metrics\":\"count,delta,variation,added,deleted\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":24664730,\"identifier\":\"wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z\"}');
```
If you look in your coordinator node logs, you should, after a maximum of a minute or so, see logs of the following form:
In your favorite editor, open up:
```
2013-08-08 22:48:41,967 INFO [main-EventThread] com.metamx.druid.coordinator.LoadQueuePeon - Server[/druid/loadQueue/127.0.0.1:8081] done processing [/druid/loadQueue/127.0.0.1:8081/wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z]
2013-08-08 22:48:41,969 INFO [ServerInventoryView-0] com.metamx.druid.client.SingleServerInventoryView - Server[127.0.0.1:8081] added segment[wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z]
examples/wikipedia/wikipedia_realtime.spec
```
When the segment completes downloading and ready for queries, you should see the following message on your historical node logs:
We need to change some configuration in order to force hand-off faster.
Let's change:
```
2013-08-08 22:48:41,959 INFO [ZkCoordinator-0] com.metamx.druid.coordination.BatchDataSegmentAnnouncer - Announcing segment[wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z] at path[/druid/segments/127.0.0.1:8081/2013-08-08T22:48:41.959Z]
"segmentGranularity": "HOUR",
```
At this point, we can query the segment. For more information on querying, see this [link](Querying.html).
to
### Bonus Round: Start a Realtime Node
```
"segmentGranularity": "FIVE_MINUTE",
```
and
```
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
```
to
```
"intermediatePersistPeriod": "PT3m",
"windowPeriod": "PT1m",
```
Now we should be handing off segments every 6 minutes or so.
To start the realtime node that was used in our first tutorial, you simply have to issue:
@ -239,27 +287,22 @@ The configurations are located in `config/realtime/runtime.properties` and shoul
```
druid.host=localhost
druid.service=realtime
druid.port=8083
druid.service=realtime
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.160","io.druid.extensions:druid-kafka-seven:0.6.160"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
# These configs are only required for real hand off
# druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
# druid.metadata.storage.connector.user=druid
# druid.metadata.storage.connector.password=diurd
# We can only 1 scan segment in parallel with these configs.
# Our intermediate buffer is also very small so longer topNs will be slow.
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
# Enable Real monitoring
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor","io.druid.segment.realtime.RealtimeMetricsMonitor"]
```
Once the real-time node starts up, it should begin ingesting data and handing that data off to the rest of the Druid cluster. You can use a web UI located at coordinator_ip:port to view the status of data being loaded. Once data is handed off from the real-time nodes to historical nodes, the historical nodes should begin serving segments.
At any point during ingestion, we can query for data. The queries should span across both real-time and historical nodes. For more information on querying, see this [link](Querying.html).
Next Steps
----------
If you are interested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data?

View File

@ -1,312 +0,0 @@
---
layout: doc_page
---
Greetings! This tutorial will help clarify some core Druid concepts. We will use a realtime dataset and issue some basic Druid queries. If you are ready to explore Druid, and learn a thing or two, read on!
About the data
--------------
The data source we'll be working with is the Bit.ly USA Government website statistics stream. You can see the stream [here](http://developer.usa.gov/1usagov), and read about the stream [here](http://www.usa.gov/About/developer-resources/1usagov.shtml) . This is a feed of json data that gets updated whenever anyone clicks a bit.ly shortened USA.gov website. A typical event might look something like this:
```json
{
"user_agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)",
"country": "US",
"known_user": 1,
"timezone": "America/New_York",
"geo_region": "DC",
"global_bitly_hash": "17ctAFs",
"encoding_user_bitly_hash": "17ctAFr",
"encoding_user_login": "senrubiopress",
"aaccept_language": "en-US",
"short_url_cname": "1.usa.gov",
"referring_url": "http://t.co/4Av4NUFAYq",
"long_url": "http://www.rubio.senate.gov/public/index.cfm/fighting-for-florida?ID=c8357d12-9da8-4e9d-b00d-7168e1bf3599",
"timestamp": 1372190407,
"timestamp of time hash was created": 1372190097,
"city": "Washington",
"latitude_longitude": [ 38.893299, -77.014603 ]
}
```
The "known_user" field is always 1 or 0. It is 1 if the user is known to the server, and 0 otherwise. We will use this field extensively in this demo.
h2. Setting Up
There are two ways to setup Druid: download a tarball, or [Build From Source](Build-From-Source.html). You only need to do one of these.
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
```
tar zxvf druid-services-*-bin.tar.gz
```
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.160
```
You should see a bunch of files:
* run_example_server.sh
* run_example_client.sh
* LICENSE, config, examples, lib directories
h2. Running Example Scripts
Let's start doing stuff. You can start a Druid [Realtime](Realtime.html) node by issuing:
```
./run_example_server.sh
```
Select "webstream".
Once the node starts up you will see a bunch of logs about setting up properties and connecting to the data source. If everything was successful, you should see messages of the form shown below.
```
Jul 19, 2013 21:54:05 PM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider
INFO: Binding io.druid.server.StatusResource to GuiceManagedComponentProvider with the scope "Undefined"
2013-07-19 21:54:05,246 INFO org.mortbay.log - Started SelectChannelConnector@0.0.0.0:8083
```
The Druid real time-node ingests events in an in-memory buffer. Periodically, these events will be persisted to disk. If you are interested in the details of our real-time architecture and why we persist indexes to disk, I suggest you read our [White Paper](http://static.druid.io/docs/druid.pdf).
Okay, things are about to get real. To query the real-time node you've spun up, you can issue:
```
./run_example_client.sh
```
Select "webstream" once again. This script issues [GroupByQueries](GroupByQuery.html) to the data we've been ingesting. The query looks like this:
```json
{
"queryType": "groupBy",
"dataSource": "webstream",
"granularity": "minute",
"dimensions": [ "timezone" ],
"aggregations": [
{ "type": "count", "name": "rows" },
{ "type": "doubleSum", "fieldName": "known_users", "name": "known_users" }
],
"filter": { "type": "selector", "dimension": "country", "value": "US" },
"intervals": [ "2013-06-01T00:00/2020-01-01T00" ]
}
```
This is a `groupBy` query, which you may be familiar with from SQL. We are grouping, or aggregating, via the `dimensions` field: . We are **filtering** via the `"country"` dimension, to only look at website hits in the US. Our **aggregations** are what we are calculating: a row count, and the sum of the number of known users in our data.
The result looks something like this:
```json
[
{
"version": "v1",
"timestamp": "2013-07-18T19:39:00.000Z",
"event": { "timezone": "America/Chicago", "known_users": 10, "rows": 15 }
},
{
"version": "v1",
"timestamp": "2013-07-18T19:39:00.000Z",
"event": { "timezone": "America/Los_Angeles", "known_users": 0, "rows": 3 }
},
...
```
This groupBy query is a bit complicated and we'll return to it later. For the time being, just make sure you are getting some blocks of data back. If you are having problems, make sure you have [curl](http://curl.haxx.se/) installed. Control+C to break out of the client script.
h2. Querying Druid
In your favorite editor, create the file:
```
time_boundary_query.body
```
Druid queries are JSON blobs which are relatively painless to create programmatically, but an absolute pain to write by hand. So anyway, we are going to create a Druid query by hand. Add the following to the file you just created:
```
{
"queryType": "timeBoundary",
"dataSource": "webstream"
}
```
The [TimeBoundaryQuery](TimeBoundaryQuery.html) is one of the simplest Druid queries. To run the query, you can issue:
```
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d time_boundary_query.body
```
We get something like this JSON back:
```json
[
{
"timestamp": "2013-07-18T19:39:00.000Z",
"result": {
"minTime": "2013-07-18T19:39:00.000Z",
"maxTime": "2013-07-18T19:46:00.000Z"
}
}
]
```
As you can probably tell, the result is indicating the maximum and minimum timestamps we've seen thus far (summarized to a minutely granularity). Let's explore a bit further.
Return to your favorite editor and create the file:
```
timeseries_query.body
```
We are going to make a slightly more complicated query, the [TimeseriesQuery](TimeseriesQuery.html). Copy and paste the following into the file:
```json
{
"queryType": "timeseries",
"dataSource": "webstream",
"intervals": [ "2010-01-01/2020-01-01" ],
"granularity": "all",
"aggregations": [
{ "type": "count", "name": "rows" },
{ "type": "doubleSum", "fieldName": "known_users", "name": "known_users" }
]
}
```
You are probably wondering, what are these [Granularities](Granularities.html) and [Aggregations](Aggregations.html) things? What the query is doing is aggregating some metrics over some span of time.
To issue the query and get some results, run the following in your command line:
```
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d timeseries_query.body
```
Once again, you should get a JSON blob of text back with your results, that looks something like this:
```json
[
{
"timestamp" : "2013-07-18T19:39:00.000Z",
"result" : { "known_users" : 787.0, "rows" : 2004 }
}
]
```
If you issue the query again, you should notice your results updating.
Right now all the results you are getting back are being aggregated into a single timestamp bucket. What if we wanted to see our aggregations on a per minute basis? What field can we change in the query to accomplish this?
If you loudly exclaimed "we can change granularity to minute", you are absolutely correct! We can specify different granularities to bucket our results, like so:
```json
{
"queryType": "timeseries",
"dataSource": "webstream",
"intervals": [ "2010-01-01/2020-01-01" ],
"granularity": "minute",
"aggregations": [
{ "type": "count", "name": "rows" },
{ "type": "doubleSum", "fieldName": "known_users", "name": "known_users" }
]
}
```
This gives us something like the following:
```json
[
{
"timestamp": "2013-07-18T19:39:00.000Z",
"result": { "known_users": 33, "rows": 76 }
},
{
"timestamp": "2013-07-18T19:40:00.000Z",
"result": { "known_users": 105, "rows": 221 }
},
{
"timestamp": "2013-07-18T19:41:00.000Z",
"result": { "known_users": 53, "rows": 167 }
},
...
```
Solving a Problem
-----------------
One of Druid's main powers is to provide answers to problems, so let's pose a problem. What if we wanted to know what the top states in the US are, ordered by the number of visits by known users over the last few minutes? To solve this problem, we have to return to the query we introduced at the very beginning of this tutorial, the [GroupByQuery](GroupByQuery.html). It would be nice if we could group by results by dimension value and somehow sort those results… and it turns out we can!
Let's create the file:
```
group_by_query.body
```
and put the following in there:
```
{
"queryType": "groupBy",
"dataSource": "webstream",
"granularity": "all",
"dimensions": [ "geo_region" ],
"limitSpec": {
"type": "default",
"columns": [
{ "dimension": "known_users", "direction": "DESCENDING" }
],
"limit": 10
},
"aggregations": [
{ "type": "count", "name": "rows" },
{ "type": "doubleSum", "fieldName": "known_users", "name": "known_users" }
],
"filter": { "type": "selector", "dimension": "country", "value": "US" },
"intervals": [ "2012-10-01T00:00/2020-01-01T00" ]
}
```
Woah! Our query just got a way more complicated. Now we have these [Filters](Filters.html) things and this [LimitSpec](LimitSpec.html) thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
If you issue the query:
```
curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @group_by_query.body
```
You should see an answer to our question. For my stream, it looks like this:
```json
[
{
"version": "v1",
"timestamp": "2012-10-01T00:00:00.000Z",
"event": { "geo_region": "RI", "known_users": 359, "rows": 143 }
},
{
"version": "v1",
"timestamp": "2012-10-01T00:00:00.000Z",
"event": { "geo_region": "NY", "known_users": 187, "rows": 322 }
},
{
"version": "v1",
"timestamp": "2012-10-01T00:00:00.000Z",
"event": { "geo_region": "CA", "known_users": 145, "rows": 466 }
},
{
"version": "v1",
"timestamp": "2012-10-01T00:00:00.000Z",
"event": { "geo_region": "IL", "known_users": 121, "rows": 185 }
},
...
```
Feel free to tweak other query parameters to answer other questions you may have about the data.
Additional Information
----------------------
This tutorial is merely showcasing a small fraction of what Druid can do. If you are interested in more information about Druid, including setting up a more sophisticated Druid cluster, please read the other links in our wiki.
And thus concludes our journey! Hopefully you learned a thing or two about Druid real-time ingestion, querying Druid, and how Druid can be used to solve problems. If you have additional questions, feel free to post in our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -1,373 +0,0 @@
---
layout: doc_page
---
Greetings! We see you've taken an interest in Druid. That's awesome! Hopefully this tutorial will help clarify some core Druid concepts. We will go through one of the Real-time [Examples](Examples.html), and issue some basic Druid queries. The data source we'll be working with is the [Twitter spritzer stream](https://dev.twitter.com/docs/streaming-apis/streams/public). If you are ready to explore Druid, brave its challenges, and maybe learn a thing or two, read on!
# Setting Up
There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz).
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:
```
tar -zxvf druid-services-0.X.X.tar.gz
```
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.X.X
```
You should see a bunch of files:
* run_example_server.sh
* run_example_client.sh
* LICENSE, config, examples, lib directories
# Clone and Build from Source
The other way to setup Druid is from source via git. To do so, run these commands:
```
git clone git@github.com:metamx/druid.git
cd druid
git checkout druid-0.X.X
./build.sh
```
You should see a bunch of files:
```
DruidCorporateCLA.pdf README common examples indexer pom.xml server
DruidIndividualCLA.pdf build.sh doc group_by.body install publications services
LICENSE client eclipse_formatting.xml index-common merger realtime
```
You can find the example executables in the examples/bin directory:
* run_example_server.sh
* run_example_client.sh
# Running Example Scripts
Let's start doing stuff. You can start a Druid [Realtime](Realtime.html) node by issuing:
```
./run_example_server.sh
```
Select "twitter".
You'll need to register a new application with the twitter API, which only takes a minute. Go to [this link](https://twitter.com/oauth_clients/new":https://twitter.com/oauth_clients/new) and fill out the form and submit. Don't worry, the home page and callback url can be anything. This will generate keys for the Twitter example application. Take note of the values for consumer key/secret and access token/secret.
Enter your credentials when prompted.
Once the node starts up you will see a bunch of logs about setting up properties and connecting to the data source. If everything was successful, you should see messages of the form shown below. If you see crazy exceptions, you probably typed in your login information incorrectly.
```
2013-05-17 23:04:40,934 INFO [main] org.mortbay.log - Started SelectChannelConnector@0.0.0.0:8080
2013-05-17 23:04:40,935 INFO [main] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking start method[public void com.metamx.druid.http.FileRequestLogger.start()] on object[com.metamx.druid.http.FileRequestLogger@42bb0406].
2013-05-17 23:04:41,578 INFO [Twitter Stream consumer-1[Establishing connection]] twitter4j.TwitterStreamImpl - Connection established.
2013-05-17 23:04:41,578 INFO [Twitter Stream consumer-1[Establishing connection]] io.druid.examples.twitter.TwitterSpritzerFirehoseFactory - Connected_to_Twitter
2013-05-17 23:04:41,578 INFO [Twitter Stream consumer-1[Establishing connection]] twitter4j.TwitterStreamImpl - Receiving status stream.
```
Periodically, you'll also see messages of the form:
```
2013-05-17 23:04:59,793 INFO [chief-twitterstream] io.druid.examples.twitter.TwitterSpritzerFirehoseFactory - nextRow() has returned 1,000 InputRows
```
These messages indicate you are ingesting events. The Druid real time-node ingests events in an in-memory buffer. Periodically, these events will be persisted to disk. Persisting to disk generates a whole bunch of logs:
```
2013-05-17 23:06:40,918 INFO [chief-twitterstream] com.metamx.druid.realtime.plumber.RealtimePlumberSchool - Submitting persist runnable for dataSource[twitterstream]
2013-05-17 23:06:40,920 INFO [twitterstream-incremental-persist] com.metamx.druid.realtime.plumber.RealtimePlumberSchool - DataSource[twitterstream], Interval[2013-05-17T23:00:00.000Z/2013-05-18T00:00:00.000Z], persisting Hydrant[FireHydrant{index=com.metamx.druid.index.v1.IncrementalIndex@126212dd, queryable=com.metamx.druid.index.IncrementalIndexSegment@64c47498, count=0}]
2013-05-17 23:06:40,937 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - Starting persist for interval[2013-05-17T23:00:00.000Z/2013-05-17T23:07:00.000Z], rows[4,666]
2013-05-17 23:06:41,039 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - outDir[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0/v8-tmp] completed index.drd in 11 millis.
2013-05-17 23:06:41,070 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - outDir[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0/v8-tmp] completed dim conversions in 31 millis.
2013-05-17 23:06:41,275 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.CompressedPools - Allocating new chunkEncoder[1]
2013-05-17 23:06:41,332 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - outDir[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0/v8-tmp] completed walk through of 4,666 rows in 262 millis.
2013-05-17 23:06:41,334 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - Starting dimension[htags] with cardinality[634]
2013-05-17 23:06:41,381 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - Completed dimension[htags] in 49 millis.
2013-05-17 23:06:41,382 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - Starting dimension[lang] with cardinality[19]
2013-05-17 23:06:41,398 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - Completed dimension[lang] in 17 millis.
2013-05-17 23:06:41,398 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - Starting dimension[utc_offset] with cardinality[32]
2013-05-17 23:06:41,413 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - Completed dimension[utc_offset] in 15 millis.
2013-05-17 23:06:41,413 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - outDir[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0/v8-tmp] completed inverted.drd in 81 millis.
2013-05-17 23:06:41,425 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexIO$DefaultIndexIOHandler - Converting v8[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0/v8-tmp] to v9[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0]
2013-05-17 23:06:41,426 INFO [twitterstream-incremental-persist]
... ETC
```
The logs are about building different columns, probably not the most exciting stuff (they might as well be in Vulcan) if are you learning about Druid for the first time. Nevertheless, if you are interested in the details of our real-time architecture and why we persist indexes to disk, I suggest you read our "White Paper":http://static.druid.io/docs/druid.pdf.
Okay, things are about to get real (-time). To query the real-time node you've spun up, you can issue:
```
./run_example_client.sh
```
Select "twitter" once again. This script issues [GroupByQueries](GroupByQuery.html) to the twitter data we've been ingesting. The query looks like this:
```json
{
"queryType": "groupBy",
"dataSource": "twitterstream",
"granularity": "all",
"dimensions": ["lang", "utc_offset"],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
],
"filter": { "type": "selector", "dimension": "lang", "value": "en" },
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}
```
This is a **groupBy** query, which you may be familiar with from SQL. We are grouping, or aggregating, via the **dimensions** field: ["lang", "utc_offset"]. We are **filtering** via the **"lang"** dimension, to only look at english tweets. Our **aggregations** are what we are calculating: a row count, and the sum of the tweets in our data.
The result looks something like this:
```json
[
{
"version": "v1",
"timestamp": "2012-10-01T00:00:00.000Z",
"event": {
"utc_offset": "-10800",
"tweets": 90,
"lang": "en",
"rows": 81
}
},
{
"version": "v1",
"timestamp": "2012-10-01T00:00:00.000Z",
"event": {
"utc_offset": "-14400",
"tweets": 177,
"lang": "en",
"rows": 154
}
},
...
```
This data, plotted in a time series/distribution, looks something like this:
![Tweets](http://metamarkets.com/wp-content/uploads/2013/06/tweets_timezone_offset.png)
This groupBy query is a bit complicated and we'll return to it later. For the time being, just make sure you are getting some blocks of data back. If you are having problems, make sure you have [curl](http://curl.haxx.se/) installed. Control+C to break out of the client script.
# Querying Druid
In your favorite editor, create the file:
```
time_boundary_query.body
```
Druid queries are JSON blobs which are relatively painless to create programmatically, but an absolute pain to write by hand. So anyway, we are going to create a Druid query by hand. Add the following to the file you just created:
```json
{
"queryType" : "timeBoundary",
"dataSource" : "twitterstream"
}
```
The "TimeBoundaryQuery":TimeBoundaryQuery.html is one of the simplest Druid queries. To run the query, you can issue:
```
curl -X POST 'http://localhost:8080/druid/v2/?pretty' -H 'content-type: application/json' -d @time_boundary_query.body
```
We get something like this JSON back:
```json
{
"timestamp" : "2013-06-10T19:09:00.000Z",
"result" : {
"minTime" : "2013-06-10T19:09:00.000Z",
"maxTime" : "2013-06-10T20:50:00.000Z"
}
} ]
```
That's the result. What information do you think the result is conveying?
...
If you said the result is indicating the maximum and minimum timestamps we've seen thus far (summarized to a minutely granularity), you are absolutely correct. I can see you are a person legitimately interested in learning about Druid. Let's explore a bit further.
Return to your favorite editor and create the file:
```
timeseries_query.body
```
We are going to make a slightly more complicated query, the [TimeseriesQuery](TimeseriesQuery.html). Copy and paste the following into the file:
```json
{
"queryType":"timeseries",
"dataSource":"twitterstream",
"intervals":["2010-01-01/2020-01-01"],
"granularity":"all",
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
]
}
```
You are probably wondering, what are these [Granularities](Granularities.html) and [Aggregations](Aggregations.html) things? What the query is doing is aggregating some metrics over some span of time.
To issue the query and get some results, run the following in your command line:
```
curl -X POST 'http://localhost:8080/druid/v2/?pretty' -H 'content-type: application/json' -d @timeseries_query.body
```
Once again, you should get a JSON blob of text back with your results, that looks something like this:
```json
[ {
"timestamp" : "2013-06-10T19:09:00.000Z",
"result" : {
"tweets" : 358562.0,
"rows" : 272271
}
} ]
```
If you issue the query again, you should notice your results updating.
Right now all the results you are getting back are being aggregated into a single timestamp bucket. What if we wanted to see our aggregations on a per minute basis? What field can we change in the query to accomplish this?
If you loudly exclaimed "we can change granularity to minute", you are absolutely correct again! We can specify different granularities to bucket our results, like so:
```json
{
"queryType":"timeseries",
"dataSource":"twitterstream",
"intervals":["2010-01-01/2020-01-01"],
"granularity":"minute",
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
]
}
```
This gives us something like the following:
```json
[ {
"timestamp" : "2013-06-10T19:09:00.000Z",
"result" : {
"tweets" : 2650.0,
"rows" : 2120
}
}, {
"timestamp" : "2013-06-10T19:10:00.000Z",
"result" : {
"tweets" : 3401.0,
"rows" : 2609
}
}, {
"timestamp" : "2013-06-10T19:11:00.000Z",
"result" : {
"tweets" : 3472.0,
"rows" : 2610
}
},
...
```
# Solving a Problem
One of Druid's main powers (see what we did there?) is to provide answers to problems, so let's pose a problem. What if we wanted to know what the top hash tags are, ordered by the number tweets, where the language is english, over the last few minutes you've been reading this tutorial? To solve this problem, we have to return to the query we introduced at the very beginning of this tutorial, the "GroupByQuery":GroupByQuery.html. It would be nice if we could group by results by dimension value and somehow sort those results... and it turns out we can!
Let's create the file:
```
group_by_query.body
```
and put the following in there:
```json
{
"queryType": "groupBy",
"dataSource": "twitterstream",
"granularity": "all",
"dimensions": ["htags"],
"limitSpec": {"type":"default", "columns":[{"dimension": "tweets", "direction":"DESCENDING"}], "limit":5},
"aggregations":[
{ "type": "longSum", "fieldName": "tweets", "name": "tweets"}
],
"filter": {"type": "selector", "dimension": "lang", "value": "en" },
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}
```
Woah! Our query just got a way more complicated. Now we have these [Filters](Filters.html) things and this [LimitSpec](LimitSpec.html) thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
If you issue the query:
```
curl -X POST 'http://localhost:8080/druid/v2/?pretty' -H 'content-type: application/json' -d @group_by_query.body
```
You should hopefully see an answer to our question. For my twitter stream, it looks like this:
```json
[ {
"version" : "v1",
"timestamp" : "2012-10-01T00:00:00.000Z",
"event" : {
"tweets" : 2660,
"htags" : "android"
}
}, {
"version" : "v1",
"timestamp" : "2012-10-01T00:00:00.000Z",
"event" : {
"tweets" : 1944,
"htags" : "E3"
}
}, {
"version" : "v1",
"timestamp" : "2012-10-01T00:00:00.000Z",
"event" : {
"tweets" : 1927,
"htags" : "15SueñosPendientes"
}
}, {
"version" : "v1",
"timestamp" : "2012-10-01T00:00:00.000Z",
"event" : {
"tweets" : 1717,
"htags" : "ipad"
}
}, {
"version" : "v1",
"timestamp" : "2012-10-01T00:00:00.000Z",
"event" : {
"tweets" : 1515,
"htags" : "IDidntTextYouBackBecause"
}
} ]
```
Feel free to tweak other query parameters to answer other questions you may have about the data.
# Additional Information
This tutorial is merely showcasing a small fraction of what Druid can do. Next, continue on to [The Druid Cluster](./Tutorial:-The-Druid-Cluster.html).
And thus concludes our journey! Hopefully you learned a thing or two about Druid real-time ingestion, querying Druid, and how Druid can be used to solve problems. If you have additional questions, feel free to post in our [google groups page](http://www.groups.google.com/forum/#!forum/druid-development).

View File

@ -10,9 +10,8 @@ h2. Introduction
h2. Getting Started
* "Tutorial: A First Look at Druid":./Tutorial:-A-First-Look-at-Druid.html
* "Tutorial: The Druid Cluster":./Tutorial:-The-Druid-Cluster.html
* "Tutorial: Loading Your Data Part 1":./Tutorial:-Loading-Your-Data-Part-1.html
* "Tutorial: Loading Your Data Part 2":./Tutorial:-Loading-Your-Data-Part-2.html
* "Tutorial: All About Queries":./Tutorial:-All-About-Queries.html
* "Tutorial: Loading Streaming Data":./Tutorial:-Loading-Streaming-Data.html
* "Tutorial: Loading Batch Data":./Tutorial:-Loading-Batch-Data.html
h2. Booting a Druid Cluster
* "Simple Cluster Configuration":Simple-Cluster-Configuration.html
@ -78,7 +77,7 @@ h2. Architecture
*** "Peon":./Peons.html
* External Dependencies
** "Deep Storage":./Deep-Storage.html
** "Metadata Storage":./MySQL.html
** "Metadata Storage":./Metadata-storage.html
** "ZooKeeper":./ZooKeeper.html
h2. Experimental

View File

@ -1,24 +1,4 @@
{
"queryType": "timeseries",
"dataSource": "twitterstream",
"granularity": "all",
"aggregations": [
{
"type": "count",
"name": "rows"
},
{
"type": "doubleSum",
"fieldName": "tweets",
"name": "tweets"
}
],
"filter": {
"type": "selector",
"dimension": "lang",
"value": "en"
},
"intervals": [
"2012-10-01T00:00\/2020-01-01T00"
]
"queryType": "timeBoundary",
"dataSource": "twitterstream"
}

View File

@ -1,25 +1,4 @@
{
"queryType":"timeseries",
"dataSource":"wikipedia",
"granularity":"minute",
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"count",
"name":"edit_count"
}
],
"filter":{
"type":"selector",
"dimension":"namespace",
"value":"article"
},
"intervals":[
"2013-06-01T00:00/2020-01-01T00"
]
"queryType":"timeBoundary",
"dataSource":"wikipedia"
}

View File

@ -76,7 +76,7 @@
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},

View File

@ -1,5 +1,5 @@
# Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0-SNAPSHOT","io.druid.extensions:druid-kafka-seven:0.7.0-SNAPSHOT","io.druid.extensions:druid-rabbitmq:0.7.0-SNAPSHOT"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-seven"]
# Zookeeper
druid.zk.service.host=localhost
@ -14,9 +14,7 @@ druid.metadata.storage.connector.password=diurd
druid.storage.type=local
druid.storage.storage.storageDirectory=/tmp/druid/localStorage
# Cache (we use a simple 10mb heap-based local cache on the broker)
druid.cache.type=local
druid.cache.sizeInBytes=10000000
# Cache can be defined here if we have a distributed cache
# Indexing service discovery
druid.selectors.indexing.serviceName=overlord

View File

@ -2,6 +2,9 @@ druid.host=localhost
druid.port=8080
druid.service=broker
# We use a simple 10mb heap-based local cache
druid.cache.type=local
druid.cache.sizeInBytes=10000000
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true

View File

@ -2,9 +2,6 @@ druid.host=localhost
druid.port=8083
druid.service=realtime
# Change this config to 'metadata' to hand off to the rest of the Druid cluster
druid.publish.type=noop
# We can only 1 scan segment in parallel with these configs.
# Our intermediate buffer is also very small so longer topNs will be slow.
druid.processing.buffer.sizeBytes=100000000

View File

@ -41,7 +41,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0</version>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>

View File

@ -36,17 +36,17 @@ import java.util.Map;
@JsonTypeName("hadoop")
public class HadoopTuningConfig implements TuningConfig
{
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
public static HadoopTuningConfig makeDefaultTuningConfig()
{
return new HadoopTuningConfig(
null,
DEFAULT_WORKING_PATH,
new DateTime().toString(),
DEFAULT_PARTITIONS_SPEC,
DEFAULT_SHARD_SPECS,
@ -86,7 +86,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("rowFlushBoundary") Integer rowFlushBoundary,
final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
@ -99,11 +99,11 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
)
{
this.workingPath = workingPath == null ? null : workingPath;
this.workingPath = workingPath == null ? DEFAULT_WORKING_PATH : workingPath;
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec;
this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs;
this.rowFlushBoundary = rowFlushBoundary == null ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
this.rowFlushBoundary = maxRowsInMemory == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemory;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
this.overwriteFiles = overwriteFiles;

View File

@ -52,7 +52,6 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query, responseContext);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(

View File

@ -35,8 +35,7 @@ import java.util.SortedSet;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = UniformGranularitySpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "uniform", value = UniformGranularitySpec.class),
@JsonSubTypes.Type(name = "arbitrary", value = ArbitraryGranularitySpec.class)
@JsonSubTypes.Type(name = "uniform", value = UniformGranularitySpec.class)
})
public interface GranularitySpec
{

View File

@ -35,8 +35,8 @@ import java.util.SortedSet;
public class UniformGranularitySpec implements GranularitySpec
{
private static final Granularity defaultSegmentGranularity = Granularity.DAY;
private static final QueryGranularity defaultQueryGranularity = QueryGranularity.NONE;
private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularity.DAY;
private static final QueryGranularity DEFAULT_QUERY_GRANULARITY = QueryGranularity.NONE;
private final Granularity segmentGranularity;
private final QueryGranularity queryGranularity;
@ -51,12 +51,8 @@ public class UniformGranularitySpec implements GranularitySpec
)
{
if (segmentGranularity != null) {
this.segmentGranularity = segmentGranularity;
} else {
this.segmentGranularity = defaultSegmentGranularity;
}
this.queryGranularity = queryGranularity == null ? defaultQueryGranularity : queryGranularity;
this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity;
if (inputIntervals != null) {
List<Interval> granularIntervals = Lists.newArrayList();