From a57d586319d98822d58e2816be12970262d08233 Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Fri, 13 Dec 2013 06:36:38 -0800 Subject: [PATCH 1/9] fixed typo --- docs/content/Tutorial:-The-Druid-Cluster.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md index 0c49d7a2315..0bc2d936716 100644 --- a/docs/content/Tutorial:-The-Druid-Cluster.md +++ b/docs/content/Tutorial:-The-Druid-Cluster.md @@ -253,5 +253,5 @@ druid.processing.buffer.sizeBytes=10000000 Next Steps ---------- -If you are intested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data? +If you are interested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data? Check out the next [tutorial](Tutorial%3A-Loading-Your-Data-Part-1.html) section for more info! From 479b953a7950c6a48bbc8ec23c6ca186ef9d350a Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Wed, 18 Dec 2013 17:35:30 -0800 Subject: [PATCH 2/9] added definitions for master and compute nodes; tweaked spacing between term and definition --- docs/content/Concepts-and-Terminology.md | 32 +++++++++++++----------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/docs/content/Concepts-and-Terminology.md b/docs/content/Concepts-and-Terminology.md index 5bd0037253b..e4d3790e8c2 100644 --- a/docs/content/Concepts-and-Terminology.md +++ b/docs/content/Concepts-and-Terminology.md @@ -8,34 +8,38 @@ The following definitions are given with respect to the Druid data store. They a More definitions are available on the [design page](Design.html). -* **Aggregation** The summarizing of data meeting certain specifications. Druid aggregates [timeseries data](#timeseries), which in effect compacts the data. Time intervals (set in configuration) are used to create buckets, while [timestamps](#timestamp) determine which buckets data aggregated in. +* **Aggregation**  The summarizing of data meeting certain specifications. Druid aggregates [timeseries data](#timeseries), which in effect compacts the data. Time intervals (set in configuration) are used to create buckets, while [timestamps](#timestamp) determine which buckets data aggregated in. -* **Aggregators** A mechanism for combining records during realtime incremental indexing, Hadoop batch indexing, and in queries. +* **Aggregators**  A mechanism for combining records during realtime incremental indexing, Hadoop batch indexing, and in queries. -* **DataSource** A table-like view of data; specified in [specFiles](#specfile) and in queries. A dataSource specifies the source of data being ingested and ultimately stored in [segments](#segment). +* **Compute node**  Obsolete name for a [Historical node](Historical.html). -* **Dimensions** Aspects or categories of data, such as languages or locations. For example, with *language* and *country* as the type of dimension, values could be "English" or "Mandarin" for language, or "USA" or "China" for country. In Druid, dimensions can serve as filters for narrowing down hits (for example, language = "English" or country = "China"). +* **DataSource**  A table-like view of data; specified in [specFiles](#specfile) and in queries. A dataSource specifies the source of data being ingested and ultimately stored in [segments](#segment). -* **Ephemeral Node** A Zookeeper node (or "znode") that exists for as long as the session that created the znode is active. More info [here](http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes). In a Druid cluster, ephemeral nodes are typically used for commands (such as assigning [segments](#segment) to certain nodes). +* **Dimensions**  Aspects or categories of data, such as languages or locations. For example, with *language* and *country* as the type of dimension, values could be "English" or "Mandarin" for language, or "USA" or "China" for country. In Druid, dimensions can serve as filters for narrowing down hits (for example, language = "English" or country = "China"). -* **Granularity** The time interval corresponding to aggregation by time. Druid configuration settings specify the granularity of [timestamp](#timestamp) buckets in a [segment](#segment) (for example, by minute or by hour), as well as the granularity of the segment itself. The latter is essentially the overall range of absolute time covered by the segment. In queries, granularity settings control the summarization of findings. +* **Ephemeral Node**  A Zookeeper node (or "znode") that exists for as long as the session that created the znode is active. More info [here](http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes). In a Druid cluster, ephemeral nodes are typically used for commands (such as assigning [segments](#segment) to certain nodes). -* **Ingestion** The pulling and initial storing and processing of data. Druid supports realtime and batch ingestion of data, and applies indexing in both cases. +* **Granularity**  The time interval corresponding to aggregation by time. Druid configuration settings specify the granularity of [timestamp](#timestamp) buckets in a [segment](#segment) (for example, by minute or by hour), as well as the granularity of the segment itself. The latter is essentially the overall range of absolute time covered by the segment. In queries, granularity settings control the summarization of findings. -* **Metrics** Countable data that can be aggregated. Metrics, for example, can be the number of visitors to a website, number of tweets per day, or average revenue. +* **Ingestion**  The pulling and initial storing and processing of data. Druid supports realtime and batch ingestion of data, and applies indexing in both cases. -* **Rollup** The aggregation of data that occurs at one or more stages, based on settings in a [configuration file](#specFile). +* **Master node**  Obsolete name for a [Coordinator node](Coordinator.html). + +* **Metrics**  Countable data that can be aggregated. Metrics, for example, can be the number of visitors to a website, number of tweets per day, or average revenue. + +* **Rollup**  The aggregation of data that occurs at one or more stages, based on settings in a [configuration file](#specFile). -* **Segment** A collection of (internal) records that are stored and processed together. Druid chunks data into segments representing a time interval, and these are stored and manipulated in the cluster. +* **Segment**  A collection of (internal) records that are stored and processed together. Druid chunks data into segments representing a time interval, and these are stored and manipulated in the cluster. -* **Shard** A sub-partition of the data, allowing multiple [segments](#segment) to represent the data in a certain time interval. Sharding occurs along time partitions to better handle amounts of data that exceed certain limits on segment size, although sharding along dimensions may also occur to optimize efficiency. +* **Shard**  A sub-partition of the data, allowing multiple [segments](#segment) to represent the data in a certain time interval. Sharding occurs along time partitions to better handle amounts of data that exceed certain limits on segment size, although sharding along dimensions may also occur to optimize efficiency. -* **specFile** The specification for services in JSON format; see [Realtime](Realtime.html) and [Batch-ingestion](Batch-ingestion.html) +* **specFile**  The specification for services in JSON format; see [Realtime](Realtime.html) and [Batch-ingestion](Batch-ingestion.html) -* **Timeseries Data** Data points which are ordered in time. The closing value of a financial index or the number of tweets per hour with a certain hashtag are examples of timeseries data. +* **Timeseries Data**  Data points which are ordered in time. The closing value of a financial index or the number of tweets per hour with a certain hashtag are examples of timeseries data. -* **Timestamp** An absolute position on a timeline, given in a standard alpha-numerical format such as with UTC time. [Timeseries data](#timeseries) points can be ordered by timestamp, and in Druid, they are. +* **Timestamp**  An absolute position on a timeline, given in a standard alpha-numerical format such as with UTC time. [Timeseries data](#timeseries) points can be ordered by timestamp, and in Druid, they are. From 0aa04dd78749e914f78f3fa28dc0b62af973b100 Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Thu, 19 Dec 2013 16:04:56 -0800 Subject: [PATCH 3/9] added title --- docs/content/Batch-ingestion.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/content/Batch-ingestion.md b/docs/content/Batch-ingestion.md index 2f3ee9821d7..9da893de9f6 100644 --- a/docs/content/Batch-ingestion.md +++ b/docs/content/Batch-ingestion.md @@ -1,6 +1,8 @@ --- layout: doc_page --- + +# Batch Data Ingestion There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-service.html) or you can use the `HadoopDruidIndexer`. Which should I use? From 18d8b0840e0f44313b763f290b136b45b7145a46 Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Thu, 19 Dec 2013 16:07:57 -0800 Subject: [PATCH 4/9] fixed formatting on numbered list and code blocks --- docs/content/Tutorial:-Loading-Your-Data-Part-2.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md index 50fef2985f6..382dd53a6b9 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md @@ -91,17 +91,17 @@ You should be comfortable starting Druid nodes at this point. If not, it may be 1. Real-time nodes can be started with: -```bash -java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath lib/*:config/realtime io.druid.cli.Main server realtime -``` + ```bash + java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath lib/*:config/realtime io.druid.cli.Main server realtime + ``` 2. A realtime.spec should already exist for the data source in the Druid tarball. You should be able to find it at: -```bash -examples/indexing/wikipedia.spec -``` + ```bash + examples/indexing/wikipedia.spec + ``` -The contents of the file should match: + The contents of the file should match: ```json [ From 721195ac1207452a29445579bceac5753cf4f2d9 Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Thu, 19 Dec 2013 17:26:51 -0800 Subject: [PATCH 5/9] added anchor for linking to section --- docs/content/Tutorial:-Loading-Your-Data-Part-2.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md index 382dd53a6b9..42e8aac9dea 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md @@ -42,6 +42,7 @@ Streaming Event Ingestion With real-world data, we recommend having a message bus such as [Apache Kafka](http://kafka.apache.org/) sit between the data stream and the real-time node. The message bus provides higher availability for production environments. [Firehoses](Firehose.html) are the key abstraction for real-time ingestion. + #### Setting up Kafka [KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.40/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node. From d283693d785336e954ef586a83622e3d0d9fd91d Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Fri, 20 Dec 2013 13:04:40 -0800 Subject: [PATCH 6/9] updated links and references since these two files contain the lists of firehoses and plumbers they refer to, and pointed to Realtime-ingestion for the example of the kafka firehose --- docs/content/Firehose.md | 10 +++++----- docs/content/Plumber.md | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/content/Firehose.md b/docs/content/Firehose.md index 62b19ac078d..5452f5103cb 100644 --- a/docs/content/Firehose.md +++ b/docs/content/Firehose.md @@ -3,15 +3,15 @@ layout: doc_page --- Firehoses describe the data stream source. They are pluggable and thus the configuration schema can and will vary based on the `type` of the firehose. -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|type|String|Specifies the type of firehose. Each value will have its own configuration schema, firehoses packaged with Druid are described [here](https://github.com/metamx/druid/wiki/Firehose#available-firehoses)|yes| +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | Specifies the type of firehose. Each value will have its own configuration schema, firehoses packaged with Druid are described below. | yes | -We describe the configuration of the Kafka firehose from the example below, but check [here](https://github.com/metamx/druid/wiki/Firehose#available-firehoses) for more information about the various firehoses that are available in Druid. +We describe the configuration of the [Kafka firehose example](Realtime-ingestion.html#realtime-specfile), but there are other types available in Druid (see below). - `consumerProps` is a map of properties for the Kafka consumer. The JSON object is converted into a Properties object and passed along to the Kafka consumer. - `feed` is the feed that the Kafka consumer should read from. -- `parser` represents a parser that knows how to convert from String representations into the required `InputRow` representation that Druid uses. This is a potentially reusable piece that can be found in many of the firehoses that are based on text streams. The spec in the example describes a JSON feed (new-line delimited objects), with a timestamp column called "timestamp" in ISO8601 format and that it should not include the dimension "value" when processing. More information about the options available for the parser are available [here](https://github.com/metamx/druid/wiki/Firehose#parsing-data). +- `parser` represents a parser that knows how to convert from String representations into the required `InputRow` representation that Druid uses. This is a potentially reusable piece that can be found in many of the firehoses that are based on text streams. The spec in the example describes a JSON feed (new-line delimited objects), with a timestamp column called "timestamp" in ISO8601 format and that it should not include the dimension "value" when processing. More information about the options available for the parser are available below. Available Firehoses ------------------- diff --git a/docs/content/Plumber.md b/docs/content/Plumber.md index 115fd66e7f5..bf71a83be2c 100644 --- a/docs/content/Plumber.md +++ b/docs/content/Plumber.md @@ -5,7 +5,7 @@ The Plumber is the thing that handles generated segments both while they are bei |Field|Type|Description|Required| |-----|----|-----------|--------| -|type|String|Specifies the type of plumber. Each value will have its own configuration schema, plumbers packaged with Druid are described [here](https://github.com/metamx/druid/wiki/Plumber#available-plumbers)|yes| +|type|String|Specifies the type of plumber. Each value will have its own configuration schema, plumbers packaged with Druid are described below.|yes| We provide a brief description of the example to exemplify the types of things that are configured on the plumber. From 3d66f0fb8bd27c5b42a33bba7353ca4876b4abaf Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Fri, 20 Dec 2013 13:08:25 -0800 Subject: [PATCH 7/9] added Realtime-ingestion under Data Ingestion (this is the content split from Realtime, which is now just about the node); moved Firehose and Plumber under Realtime-ingestion --- docs/content/toc.textile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content/toc.textile b/docs/content/toc.textile index 802d94ef434..7e3ae9566ef 100644 --- a/docs/content/toc.textile +++ b/docs/content/toc.textile @@ -21,7 +21,9 @@ h2. Operations * "Booting a Production Cluster":./Booting-a-production-cluster.html h2. Data Ingestion -* "Realtime":./Realtime.html +* "Realtime":./Realtime-ingestion.html +** "Firehose":./Firehose.html +** "Plumber":./Plumber.html * "Batch":./Batch-ingestion.html * "Indexing Service":./Indexing-Service.html ** "Tasks":./Tasks.html @@ -52,8 +54,6 @@ h2. Architecture ** "Coordinator":./Coordinator.html *** "Rule Configuration":./Rule-Configuration.html ** "Realtime":./Realtime.html -*** "Firehose":./Firehose.html -*** "Plumber":./Plumber.html ** "Indexing Service":./Indexing-Service.html *** "Middle Manager":./Middlemanager.html *** "Peon":./Peons.html From 9b539122097f0e05488c674a566a53bc894f464c Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Fri, 20 Dec 2013 13:42:22 -0800 Subject: [PATCH 8/9] Added titles --- docs/content/Firehose.md | 2 ++ docs/content/Plumber.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/content/Firehose.md b/docs/content/Firehose.md index 5452f5103cb..c87d0b792bf 100644 --- a/docs/content/Firehose.md +++ b/docs/content/Firehose.md @@ -1,6 +1,8 @@ --- layout: doc_page --- + +# Druid Firehoses Firehoses describe the data stream source. They are pluggable and thus the configuration schema can and will vary based on the `type` of the firehose. | Field | Type | Description | Required | diff --git a/docs/content/Plumber.md b/docs/content/Plumber.md index bf71a83be2c..dfbb3b6b3bf 100644 --- a/docs/content/Plumber.md +++ b/docs/content/Plumber.md @@ -1,6 +1,8 @@ --- layout: doc_page --- + +# Druid Plumbers The Plumber is the thing that handles generated segments both while they are being generated and when they are "done". This is also technically a pluggable interface and there are multiple implementations, but there are a lot of details handled by the plumber such that it is expected that there will only be a few implementations and only more advanced third-parties will implement their own. |Field|Type|Description|Required| From 39012c6dbed67c80e0a2006eb87a65104b00fe13 Mon Sep 17 00:00:00 2001 From: Igal Levy Date: Fri, 20 Dec 2013 13:43:55 -0800 Subject: [PATCH 9/9] Separated realitme ingestion from realtime node info; under Data Ingestion the Realtime link now points to the new realtime-ingestion page --- docs/content/Realtime-ingestion.md | 149 ++++++++++++++++++++++++++ docs/content/Realtime.md | 162 ++--------------------------- 2 files changed, 158 insertions(+), 153 deletions(-) create mode 100644 docs/content/Realtime-ingestion.md diff --git a/docs/content/Realtime-ingestion.md b/docs/content/Realtime-ingestion.md new file mode 100644 index 00000000000..f183ad06804 --- /dev/null +++ b/docs/content/Realtime-ingestion.md @@ -0,0 +1,149 @@ +--- +layout: doc_page +--- +Realtime Data Ingestion +======== + +Realtime data ingestion uses [Realtime nodes](Realtime.html) to index data and make it immediately available for querying. This data is periodically handed off (in the form of data segments) to [Historical](Historical.html) nodes, after which that data is forgotten by the Realtime nodes. This handoff, or "segment propagation," involves a series of interactions between various members of the Druid cluster. It is illustrated below. + +Much of the configuration governing Realtime nodes and the ingestion of data is set in the Realtime spec file, discussed on this page. + + +Segment Propagation +------------------- + +The segment propagation diagram for real-time data ingestion can be seen below: + +![Segment Propagation](../img/segmentPropagation.png "Segment Propagation") + +You can read about the various components shown in this diagram under the Architecture section (see the menu on the left). + + +## Realtime "specFile" + +The property `druid.realtime.specFile` has the path of a file (absolute or relative path and file name) with realtime specifications in it. This "specFile" should be a JSON Array of JSON objects like the following: + +```json +[ + { + "schema": { + "dataSource": "dataSourceName", + "aggregators": [ + { + "type": "count", + "name": "events" + }, + { + "type": "doubleSum", + "name": "outColumn", + "fieldName": "inColumn" + } + ], + "indexGranularity": "minute", + "shardSpec": { + "type": "none" + } + }, + "config": { + "maxRowsInMemory": 500000, + "intermediatePersistPeriod": "PT10m" + }, + "firehose": { + "type": "kafka-0.7.2", + "consumerProps": { + "zk.connect": "zk_connect_string", + "zk.connectiontimeout.ms": "15000", + "zk.sessiontimeout.ms": "15000", + "zk.synctime.ms": "5000", + "groupid": "consumer-group", + "fetch.size": "1048586", + "autooffset.reset": "largest", + "autocommit.enable": "false" + }, + "feed": "your_kafka_topic", + "parser": { + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "data": { + "format": "json" + }, + "dimensionExclusions": [ + "value" + ] + } + }, + "plumber": { + "type": "realtime", + "windowPeriod": "PT10m", + "segmentGranularity": "hour", + "basePersistDirectory": "\/tmp\/realtime\/basePersist" + } + } +] +``` + +This is a JSON Array so you can give more than one realtime stream to a given node. The number you can put in the same process depends on the exact configuration. In general, it is best to think of each realtime stream handler as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread for incremental persists and other background tasks. + +There are four parts to a realtime stream specification, `schema`, `config`, `firehose` and `plumber` which we will go into here. + +### Schema + +This describes the data schema for the output Druid segment. More information about concepts in Druid and querying can be found at [Concepts-and-Terminology](Concepts-and-Terminology.html) and [Querying](Querying.html). + +|Field|Type|Description|Required| +|-----|----|-----------|--------| +|aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes| +|dataSource|String|The name of the dataSource that the segment belongs to.|yes| +|indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes| +|segmentGranularity|String|The granularity of the segment as a whole. This is generally larger than the index granularity and describes the rate at which the realtime server will push segments out for historical servers to take over.|yes| +|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no| + +### Config + +This provides configuration for the data processing portion of the realtime stream processor. + +|Field|Type|Description|Required| +|-----|----|-----------|--------| +|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|yes| +|maxRowsInMemory|Number|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|yes| + +### Firehose + +See [Firehose](Firehose.html). + +### Plumber + +See [Plumber](Plumber.html) + +Constraints +----------- + +The following table summarizes constraints between settings in the spec file for the Realtime subsystem. + +|Name|Effect|Minimum|Recommended| +|----|------|-------|-----------| +| windowPeriod| when reading an InputRow, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers | +| segmentGranularity| time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than indexGranularity| +| indexGranularity| time granularity (minute, hour, day, week, month) of indexes | less than segmentGranularity| minute, hour, day, week, month | +| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory | +| maxRowsInMemory| the max number of InputRows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | + +The normal, expected use cases have the following overall constraints: `indexGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity` + +If the RealtimeNode process runs out of heap, try adjusting druid.computation.buffer.size property which specifies a size in bytes that must fit into the heap. + + + +Extending the code +------------------ + +Realtime integration is intended to be extended in two ways: + +1. Connect to data streams from varied systems ([Firehose](https://github.com/druid-io/druid-api/blob/master/src/main/java/io/druid/data/input/FirehoseFactory.java)) +2. Adjust the publishing strategy to match your needs ([Plumber](https://github.com/metamx/druid/blob/master/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java)) + +The expectations are that the former will be very common and something that users of Druid will do on a fairly regular basis. Most users will probably never have to deal with the latter form of customization. Indeed, we hope that all potential use cases can be packaged up as part of Druid proper without requiring proprietary customization. + +Given those expectations, adding a firehose is straightforward and completely encapsulated inside of the interface. Adding a plumber is more involved and requires understanding of how the system works to get right, it’s not impossible, but it’s not intended that individuals new to Druid will be able to do it immediately. diff --git a/docs/content/Realtime.md b/docs/content/Realtime.md index 980e1dfb773..2cede5f9d6a 100644 --- a/docs/content/Realtime.md +++ b/docs/content/Realtime.md @@ -1,7 +1,7 @@ --- layout: doc_page --- -Realtime +Realtime Nodes ======== Realtime nodes provide a realtime index. Data indexed via these nodes is immediately available for querying. Realtime nodes will periodically build segments representing the data they’ve collected over some span of time and transfer these segments off to [Historical](Historical.html) nodes. They use ZooKeeper to monitor the transfer and MySQL to store metadata about the transfered segment. Once transfered, segments are forgotten by the Realtime nodes. @@ -32,6 +32,12 @@ druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.40"] druid.zk.service.host=localhost +# The realtime config file. +druid.realtime.specFile=/path/to/specFile + +# Choices: db (hand off segments), noop (do not hand off segments). +druid.publish.type=db + druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid druid.db.connector.user=druid druid.db.connector.password=diurd @@ -39,160 +45,10 @@ druid.db.connector.password=diurd druid.processing.buffer.sizeBytes=10000000 ``` -Note: This setup will not hand off segments to the rest of the cluster. +The realtime module also uses several of the default modules in [Configuration](Configuration.html). For more information on the realtime spec file (or configuration file), see [realtime ingestion](Realtime-ingestion.html) page. -JVM Configuration ------------------ - -The realtime module uses several of the default modules in [Configuration](Configuration.html) and has the following set of configurations as well: - -|Property|Description|Default| -|--------|-----------|-------| -|`druid.realtime.specFile`|The file with realtime specifications in it.|none| -|`druid.publish.type`|Choices:noop, db. After a real-time node completes building a segment after the window period, what does it do with it? For true handoff to occur, this should be set to "db".|db| - -### Realtime "specFile" - -The property `druid.realtime.specFile` has the path of a file (absolute or relative path and file name) with realtime specifications in it. This "specFile" should be a JSON Array of JSON objects like the following: - -```json -[ - { - "schema": { - "dataSource": "dataSourceName", - "aggregators": [ - { - "type": "count", - "name": "events" - }, - { - "type": "doubleSum", - "name": "outColumn", - "fieldName": "inColumn" - } - ], - "indexGranularity": "minute", - "shardSpec": { - "type": "none" - } - }, - "config": { - "maxRowsInMemory": 500000, - "intermediatePersistPeriod": "PT10m" - }, - "firehose": { - "type": "kafka-0.7.2", - "consumerProps": { - "zk.connect": "zk_connect_string", - "zk.connectiontimeout.ms": "15000", - "zk.sessiontimeout.ms": "15000", - "zk.synctime.ms": "5000", - "groupid": "consumer-group", - "fetch.size": "1048586", - "autooffset.reset": "largest", - "autocommit.enable": "false" - }, - "feed": "your_kafka_topic", - "parser": { - "timestampSpec": { - "column": "timestamp", - "format": "iso" - }, - "data": { - "format": "json" - }, - "dimensionExclusions": [ - "value" - ] - } - }, - "plumber": { - "type": "realtime", - "windowPeriod": "PT10m", - "segmentGranularity": "hour", - "basePersistDirectory": "\/tmp\/realtime\/basePersist" - } - } -] -``` - -This is a JSON Array so you can give more than one realtime stream to a given node. The number you can put in the same process depends on the exact configuration. In general, it is best to think of each realtime stream handler as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread for incremental persists and other background tasks. - -There are four parts to a realtime stream specification, `schema`, `config`, `firehose` and `plumber` which we will go into here. - -#### Schema - -This describes the data schema for the output Druid segment. More information about concepts in Druid and querying can be found at [Concepts-and-Terminology](Concepts-and-Terminology.html) and [Querying](Querying.html). - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes| -|dataSource|String|The name of the dataSource that the segment belongs to.|yes| -|indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes| -|segmentGranularity|String|The granularity of the segment as a whole. This is generally larger than the index granularity and describes the rate at which the realtime server will push segments out for historical servers to take over.|yes| -|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no| - -### Config - -This provides configuration for the data processing portion of the realtime stream processor. - -|Field|Type|Description|Required| -|-----|----|-----------|--------| -|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|yes| -|maxRowsInMemory|Number|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|yes| - -### Firehose - -See [Firehose](Firehose.html). - -### Plumber - -See [Plumber](Plumber.html) - -Constraints ------------ - -The following tables summarizes constraints between settings in the spec file for the Realtime subsystem. - -|Name|Effect|Minimum|Recommended| -|----|------|-------|-----------| -| windowPeriod| when reading an InputRow, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers | -| segmentGranularity| time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than indexGranularity| -| indexGranularity| time granularity (minute, hour, day, week, month) of indexes | less than segmentGranularity| minute, hour, day, week, month | -| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory | -| maxRowsInMemory| the max number of InputRows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | - -The normal, expected use cases have the following overall constraints: `indexGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity` - -If the RealtimeNode process runs out of heap, try adjusting druid.computation.buffer.size property which specifies a size in bytes that must fit into the heap. - -Running -------- - -``` -io.druid.cli.Main server realtime -``` - -Segment Propagation -------------------- - -The segment propagation diagram for real-time data ingestion can be seen below: - -![Segment Propagation](../img/segmentPropagation.png "Segment Propagation") Requirements ------------ -Realtime nodes currently require a Kafka cluster to sit in front of them and collect results. There’s more configuration required for these as well. - -Extending the code ------------------- - -Realtime integration is intended to be extended in two ways: - -1. Connect to data streams from varied systems ([Firehose](https://github.com/druid-io/druid-api/blob/master/src/main/java/io/druid/data/input/FirehoseFactory.java)) -2. Adjust the publishing strategy to match your needs ([Plumber](https://github.com/metamx/druid/blob/master/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java)) - -The expectations are that the former will be very common and something that users of Druid will do on a fairly regular basis. Most users will probably never have to deal with the latter form of customization. Indeed, we hope that all potential use cases can be packaged up as part of Druid proper without requiring proprietary customization. - -Given those expectations, adding a firehose is straightforward and completely encapsulated inside of the interface. Adding a plumber is more involved and requires understanding of how the system works to get right, it’s not impossible, but it’s not intended that individuals new to Druid will be able to do it immediately. +Realtime nodes currently require a Kafka cluster to sit in front of them and collect results. There’s [more configuration](Tutorial\:-Loading-Your-Data-Part-2.md#set-up-kafka) required for these as well.