merge changes from master

This commit is contained in:
nishantmonu51 2014-03-20 20:23:58 +05:30
commit 69e9b7a6c5
91 changed files with 955 additions and 451 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " " echo " "
ls -1 examples/*/*sh ls -1 examples/*/*sh
echo " " echo " "
echo "See also http://druid.io/docs/0.6.63" echo "See also http://druid.io/docs/0.6.72"

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -26,6 +26,11 @@ druid.service=broker
druid.port=8080 druid.port=8080
druid.zk.service.host=localhost druid.zk.service.host=localhost
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
``` ```
Production Configs Production Configs
@ -101,7 +106,7 @@ The broker module uses several of the default modules in [Configuration](Configu
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
|`druid.broker.cache.sizeInBytes`|Maximum size of the cache. If this is zero, cache is disabled.|0| |`druid.broker.cache.sizeInBytes`|Maximum size of the cache. If this is zero, cache is disabled.|10485760 (10MB)|
|`druid.broker.cache.initialSize`|The initial size of the cache in bytes.|500000| |`druid.broker.cache.initialSize`|The initial size of the cache in bytes.|500000|
|`druid.broker.cache.logEvictionCount`|If this is non-zero, there will be an eviction of entries.|0| |`druid.broker.cache.logEvictionCount`|If this is non-zero, there will be an eviction of entries.|0|

View File

@ -193,7 +193,7 @@ This module is required by nodes that can serve queries.
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
|`druid.query.chunkPeriod`|Long interval queries may be broken into shorter interval queries.|P1M| |`druid.query.chunkPeriod`|Long interval queries may be broken into shorter interval queries.|0|
#### GroupBy Query Config #### GroupBy Query Config

View File

@ -20,7 +20,7 @@ io.druid.cli.Main server coordinator
Rules Rules
----- -----
Segments are loaded and dropped from the cluster based on a set of rules. Rules indicate how segments should be assigned to different historical node tiers and how many replicants of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. The coordinator loads a set of rules from the database. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule Segments are loaded and dropped from the cluster based on a set of rules. Rules indicate how segments should be assigned to different historical node tiers and how many replicants of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. The coordinator loads a set of rules from the database. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule.
For more information on rules, see [Rule Configuration](Rule-Configuration.html). For more information on rules, see [Rule Configuration](Rule-Configuration.html).

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid git clone https://github.com/metamx/druid.git druid
cd druid cd druid
git fetch --tags git fetch --tags
git checkout druid-0.6.63 git checkout druid-0.6.72
./build.sh ./build.sh
``` ```
### Downloading the DSK (Druid Standalone Kit) ### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.63-bin.tar.gz) a stand-alone tarball and run it: [Download](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz) a stand-alone tarball and run it:
``` bash ``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -2,13 +2,13 @@
layout: doc_page layout: doc_page
--- ---
# Aggregation Granularity # Aggregation Granularity
The granularity field determines how data gets bucketed across the time dimension, i.e how it gets aggregated by hour, day, minute, etc. The granularity field determines how data gets bucketed across the time dimension, or how it gets aggregated by hour, day, minute, etc.
It can be specified either as a string for simple granularities or as an object for arbitrary granularities. It can be specified either as a string for simple granularities or as an object for arbitrary granularities.
### Simple Granularities ### Simple Granularities
Simple granularities are specified as a string and bucket timestamps by their UTC time (i.e. days start at 00:00 UTC). Simple granularities are specified as a string and bucket timestamps by their UTC time (e.g., days start at 00:00 UTC).
Supported granularity strings are: `all`, `none`, `minute`, `fifteen_minute`, `thirty_minute`, `hour` and `day` Supported granularity strings are: `all`, `none`, `minute`, `fifteen_minute`, `thirty_minute`, `hour` and `day`
@ -35,25 +35,21 @@ This chunks up every hour on the half-hour.
### Period Granularities ### Period Granularities
Period granularities are specified as arbitrary period combinations of years, months, weeks, hours, minutes and seconds (e.g. P2W, P3M, PT1H30M, PT0.750S) in ISO8601 format. Period granularities are specified as arbitrary period combinations of years, months, weeks, hours, minutes and seconds (e.g. P2W, P3M, PT1H30M, PT0.750S) in ISO8601 format. They support specifying a time zone which determines where period boundaries start as well as the timezone of the returned timestamps. By default, years start on the first of January, months start on the first of the month and weeks start on Mondays unless an origin is specified.
They support specifying a time zone which determines where period boundaries start and also determines the timezone of the returned timestamps. Time zone is optional (defaults to UTC). Origin is optional (defaults to 1970-01-01T00:00:00 in the given time zone).
By default years start on the first of January, months start on the first of the month and weeks start on Mondays unless an origin is specified.
Time zone is optional (defaults to UTC)
Origin is optional (defaults to 1970-01-01T00:00:00 in the given time zone)
``` ```
{"type": "period", "period": "P2D", "timeZone": "America/Los_Angeles"} {"type": "period", "period": "P2D", "timeZone": "America/Los_Angeles"}
``` ```
This will bucket by two day chunks in the Pacific timezone. This will bucket by two-day chunks in the Pacific timezone.
``` ```
{"type": "period", "period": "P3M", "timeZone": "America/Los_Angeles", "origin": "2012-02-01T00:00:00-08:00"} {"type": "period", "period": "P3M", "timeZone": "America/Los_Angeles", "origin": "2012-02-01T00:00:00-08:00"}
``` ```
This will bucket by 3 month chunks in the Pacific timezone where the three-month quarters are defined as starting from February. This will bucket by 3-month chunks in the Pacific timezone where the three-month quarters are defined as starting from February.
Supported time zones: timezone support is provided by the [Joda Time library](http://www.joda.org), which uses the standard IANA time zones. [Joda Time supported timezones](http://joda-time.sourceforge.net/timezones.html) #### Supported Time Zones
Timezone support is provided by the [Joda Time library](http://www.joda.org), which uses the standard IANA time zones. See the [Joda Time supported timezones](http://joda-time.sourceforge.net/timezones.html).

View File

@ -2,7 +2,7 @@
layout: doc_page layout: doc_page
--- ---
# groupBy Queries # groupBy Queries
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. Note: If you only want to do straight aggreagates for some time range, we highly recommend using [TimeseriesQueries](TimeseriesQuery.html) instead. The performance will be substantially better. These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. Note: If you only want to do straight aggregates for some time range, we highly recommend using [TimeseriesQueries](TimeseriesQuery.html) instead. The performance will be substantially better.
An example groupBy query object is shown below: An example groupBy query object is shown below:
``` json ``` json

View File

@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/indexer druid.service=druid/prod/indexer
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.63"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod
@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/worker druid.service=druid/prod/worker
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.63","io.druid.extensions:druid-kafka-seven:0.6.63"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod

View File

@ -22,7 +22,7 @@ druid.storage.baseKey=sample
``` ```
## I don't see my Druid segments on my historical nodes ## I don't see my Druid segments on my historical nodes
You can check the coordinator console located at <COORDINATOR_IP>:<PORT>/cluster.html. Make sure that your segments have actually loaded on [historical nodes](Historical.html). If your segments are not present, check the coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because historical nodes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example): You can check the coordinator console located at `<COORDINATOR_IP>:<PORT>/cluster.html`. Make sure that your segments have actually loaded on [historical nodes](Historical.html). If your segments are not present, check the coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because historical nodes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example):
``` ```
-Ddruid.segmentCache.locations=[{"path":"/tmp/druid/storageLocation","maxSize":"500000000000"}] -Ddruid.segmentCache.locations=[{"path":"/tmp/druid/storageLocation","maxSize":"500000000000"}]
@ -31,7 +31,7 @@ You can check the coordinator console located at <COORDINATOR_IP>:<PORT>/cluster
## My queries are returning empty results ## My queries are returning empty results
You can check <BROKER_IP>:<PORT>/druid/v2/datasources/<YOUR_DATASOURCE> for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists. You can check `<BROKER_IP>:<PORT>/druid/v2/datasources/<YOUR_DATASOURCE>` for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists.
## More information ## More information

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime druid.service=realtime
druid.port=8083 druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.63"] druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.72"]
druid.zk.service.host=localhost druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/realtime druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.63","io.druid.extensions:druid-kafka-seven:0.6.63"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod

View File

@ -37,7 +37,7 @@ There are several main parts to a search query:
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| |intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|searchDimensions|The dimensions to run the search over. Excluding this means the search is run over all dimensions.|no| |searchDimensions|The dimensions to run the search over. Excluding this means the search is run over all dimensions.|no|
|query|See [SearchQuerySpec](SearchQuerySpec.html).|yes| |query|See [SearchQuerySpec](SearchQuerySpec.html).|yes|
|sort|How the results of the search should sorted. Two possible types here are "lexicographic" and "strlen".|yes| |sort|How the results of the search should be sorted. Two possible types here are "lexicographic" and "strlen".|yes|
|context|An additional JSON Object which can be used to specify certain flags.|no| |context|An additional JSON Object which can be used to specify certain flags.|no|
The format of the result is: The format of the result is:

View File

@ -15,7 +15,7 @@ Segment metadata queries return per segment information about:
{ {
"queryType":"segmentMetadata", "queryType":"segmentMetadata",
"dataSource":"sample_datasource", "dataSource":"sample_datasource",
"intervals":["2013-01-01/2014-01-01"], "intervals":["2013-01-01/2014-01-01"]
} }
``` ```

View File

@ -9,7 +9,7 @@ There are several different types of tasks.
Segment Creation Tasks Segment Creation Tasks
---------------------- ----------------------
#### Index Task ### Index Task
The Index Task is a simpler variation of the Index Hadoop task that is designed to be used for smaller data sets. The task executes within the indexing service and does not require an external Hadoop setup to use. The grammar of the index task is as follows: The Index Task is a simpler variation of the Index Hadoop task that is designed to be used for smaller data sets. The task executes within the indexing service and does not require an external Hadoop setup to use. The grammar of the index task is as follows:
@ -51,15 +51,15 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|--------|-----------|---------| |--------|-----------|---------|
|type|The task type, this should always be "index".|yes| |type|The task type, this should always be "index".|yes|
|id|The task ID.|no| |id|The task ID.|no|
|granularitySpec|See [granularitySpec](Tasks.html)|yes| |granularitySpec|Specifies the segment chunks that the task will process. `type` is always "uniform"; `gran` sets the granularity of the chunks ("DAY" means all segments containing timestamps in the same day, while `intervals` sets the interval that the chunks will cover.|yes|
|spatialDimensions|Dimensions to build spatial indexes over. See [Spatial-Indexing](Spatial-Indexing.html)|no| |spatialDimensions|Dimensions to build spatial indexes over. See [Geographic Queries](GeographicQueries.html).|no|
|aggregators|The metrics to aggregate in the data set. For more info, see [Aggregations](Aggregations.html)|yes| |aggregators|The metrics to aggregate in the data set. For more info, see [Aggregations](Aggregations.html)|yes|
|indexGranularity|The rollup granularity for timestamps.|no| |indexGranularity|The rollup granularity for timestamps.|no|
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|no| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|no|
|firehose|The input source of data. For more info, see [Firehose](Firehose.html)|yes| |firehose|The input source of data. For more info, see [Firehose](Firehose.html)|yes|
|rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|no| |rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|no|
#### Index Hadoop Task ### Index Hadoop Task
The Hadoop Index Task is used to index larger data sets that require the parallelization and processing power of a Hadoop cluster. The Hadoop Index Task is used to index larger data sets that require the parallelization and processing power of a Hadoop cluster.
@ -79,11 +79,11 @@ The Hadoop Index Task is used to index larger data sets that require the paralle
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally. The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.
##### Using your own Hadoop distribution #### Using your own Hadoop distribution
Druid is compiled against Apache hadoop-core 1.0.3. However, if you happen to use a different flavor of hadoop that is API compatible with hadoop-core 1.0.3, you should only have to change the hadoopCoordinates property to point to the maven artifact used by your distribution. Druid is compiled against Apache hadoop-core 1.0.3. However, if you happen to use a different flavor of hadoop that is API compatible with hadoop-core 1.0.3, you should only have to change the hadoopCoordinates property to point to the maven artifact used by your distribution.
##### Resolving dependency conflicts running HadoopIndexTask #### Resolving dependency conflicts running HadoopIndexTask
Currently, the HadoopIndexTask creates a single classpath to run the HadoopDruidIndexerJob, which can lead to version conflicts between various dependencies of Druid, extension modules, and Hadoop's own dependencies. Currently, the HadoopIndexTask creates a single classpath to run the HadoopDruidIndexerJob, which can lead to version conflicts between various dependencies of Druid, extension modules, and Hadoop's own dependencies.
@ -91,7 +91,7 @@ The Hadoop index task will put Druid's dependencies first on the classpath, foll
If you are having trouble with any extensions in HadoopIndexTask, it may be the case that Druid, or one of its dependencies, depends on a different version of a library than what you are using as part of your extensions, but Druid's version overrides the one in your extension. In that case you probably want to build your own Druid version and override the offending library by adding an explicit dependency to the pom.xml of each druid sub-module that depends on it. If you are having trouble with any extensions in HadoopIndexTask, it may be the case that Druid, or one of its dependencies, depends on a different version of a library than what you are using as part of your extensions, but Druid's version overrides the one in your extension. In that case you probably want to build your own Druid version and override the offending library by adding an explicit dependency to the pom.xml of each druid sub-module that depends on it.
#### Realtime Index Task ### Realtime Index Task
The indexing service can also run real-time tasks. These tasks effectively transform a middle manager into a real-time node. We introduced real-time tasks as a way to programmatically add new real-time data sources without needing to manually add nodes. The grammar for the real-time task is as follows: The indexing service can also run real-time tasks. These tasks effectively transform a middle manager into a real-time node. We introduced real-time tasks as a way to programmatically add new real-time data sources without needing to manually add nodes. The grammar for the real-time task is as follows:
@ -152,10 +152,7 @@ The indexing service can also run real-time tasks. These tasks effectively trans
"intermediatePersistPeriod": "PT10m" "intermediatePersistPeriod": "PT10m"
}, },
"windowPeriod": "PT10m", "windowPeriod": "PT10m",
"segmentGranularity": "hour", "segmentGranularity": "hour"
"rejectionPolicy": {
"type": "messageTime"
}
} }
``` ```
@ -172,7 +169,7 @@ For schema, fireDepartmentConfig, windowPeriod, segmentGranularity, and rejectio
Segment Merging Tasks Segment Merging Tasks
--------------------- ---------------------
#### Append Task ### Append Task
Append tasks append a list of segments together into a single segment (one after the other). The grammar is: Append tasks append a list of segments together into a single segment (one after the other). The grammar is:
@ -184,7 +181,7 @@ Append tasks append a list of segments together into a single segment (one after
} }
``` ```
#### Merge Task ### Merge Task
Merge tasks merge a list of segments together. Any common timestamps are merged. The grammar is: Merge tasks merge a list of segments together. Any common timestamps are merged. The grammar is:
@ -199,7 +196,7 @@ Merge tasks merge a list of segments together. Any common timestamps are merged.
Segment Destroying Tasks Segment Destroying Tasks
------------------------ ------------------------
#### Delete Task ### Delete Task
Delete tasks create empty segments with no data. The grammar is: Delete tasks create empty segments with no data. The grammar is:
@ -211,7 +208,7 @@ Delete tasks create empty segments with no data. The grammar is:
} }
``` ```
#### Kill Task ### Kill Task
Kill tasks delete all information about a segment and removes it from deep storage. Killable segments must be disabled (used==0) in the Druid segment table. The available grammar is: Kill tasks delete all information about a segment and removes it from deep storage. Killable segments must be disabled (used==0) in the Druid segment table. The available grammar is:
@ -226,7 +223,7 @@ Kill tasks delete all information about a segment and removes it from deep stora
Misc. Tasks Misc. Tasks
----------- -----------
#### Version Converter Task ### Version Converter Task
These tasks convert segments from an existing older index version to the latest index version. The available grammar is: These tasks convert segments from an existing older index version to the latest index version. The available grammar is:
@ -240,7 +237,7 @@ These tasks convert segments from an existing older index version to the latest
} }
``` ```
#### Noop Task ### Noop Task
These tasks start, sleep for a time and are used only for testing. The available grammar is: These tasks start, sleep for a time and are used only for testing. The available grammar is:

View File

@ -9,6 +9,7 @@ TopN queries return a sorted set of results for the values in a given dimension
A topN query object looks like: A topN query object looks like:
```json ```json
{
"queryType": "topN", "queryType": "topN",
"dataSource": "sample_data", "dataSource": "sample_data",
"dimension": "sample_dim", "dimension": "sample_dim",

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball ### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.63-bin.tar.gz). Download this file to a directory of your choosing. We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.63 cd druid-services-0.6.72
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -160,13 +160,15 @@ You should be comfortable starting Druid nodes at this point. If not, it may be
"segmentGranularity": "hour", "segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist", "basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": { "rejectionPolicy": {
"type": "none" "type": "test"
} }
} }
} }
] ]
``` ```
Note: This config uses a "test" rejection policy which will accept all events and timely hand off, however, we strongly recommend you do not use this in production. Using this rejection policy, segments for events for the same time range will be overridden.
3. Let's copy and paste some data into the Kafka console producer 3. Let's copy and paste some data into the Kafka console producer
```json ```json

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.63-bin.tar.gz) You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz)
and untar the contents within by issuing: and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.63"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.63","io.druid.extensions:druid-kafka-seven:0.6.63"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop
@ -251,6 +251,9 @@ druid.publish.type=noop
# druid.db.connector.password=diurd # druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
``` ```
Next Steps Next Steps

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.63-bin.tar.gz) We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz)
Download this file to a directory of your choosing. Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.63 cd druid-services-0.6.72
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.63-bin.tar.gz. We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz.
Download this bad boy to a directory of your choosing. Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:

View File

@ -53,7 +53,7 @@
"segmentGranularity": "hour", "segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist", "basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": { "rejectionPolicy": {
"type": "messageTime" "type": "test"
} }
} }
} }

View File

@ -43,6 +43,6 @@
"windowPeriod" : "PT5m", "windowPeriod" : "PT5m",
"segmentGranularity":"hour", "segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist", "basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": { "type": "messageTime" } "rejectionPolicy": { "type": "test" }
} }
}] }]

View File

@ -3,3 +3,7 @@ druid.service=broker
druid.port=8080 druid.port=8080
druid.zk.service.host=localhost druid.zk.service.host=localhost
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.63"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.63","io.druid.extensions:druid-kafka-seven:0.6.63","io.druid.extensions:druid-rabbitmq:0.6.63"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72","io.druid.extensions:druid-rabbitmq:0.6.72"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop
@ -16,3 +16,5 @@ druid.publish.type=noop
druid.processing.buffer.sizeBytes=100000000 druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1 druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -281,7 +281,7 @@ public class HadoopIndexTask extends AbstractTask
Jobby job = new HadoopDruidDetermineConfigurationJob(config); Jobby job = new HadoopDruidDetermineConfigurationJob(config);
log.info("Starting a hadoop index generator job..."); log.info("Starting a hadoop determine configuration job...");
if (job.run()) { if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config)); return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config));
} }

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

17
pom.xml
View File

@ -23,14 +23,14 @@
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection> <connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection> <developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url> <url>http://www.github.com/metamx/druid</url>
<tag>${project.artifactId}-${project.version}</tag> <tag>druid-0.6.72-SNAPSHOT</tag>
</scm> </scm>
<prerequisites> <prerequisites>
@ -535,6 +535,19 @@
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version> <version>2.4</version>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.4.2</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-provider-gitexe</artifactId>
<!-- This version is necessary for use with git version 1.8.5 and above -->
<version>1.8.1</version>
</dependency>
</dependencies>
</plugin>
</plugins> </plugins>
</pluginManagement> </pluginManagement>
</build> </build>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -25,6 +25,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
@ -84,11 +85,6 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
{ {
final int priority = Integer.parseInt(query.getContextValue("priority", "0")); final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
return Sequences.empty();
}
return new BaseSequence<T, Iterator<T>>( return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>() new BaseSequence.IteratorMaker<T, Iterator<T>>()
{ {
@ -111,6 +107,9 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public List<T> call() throws Exception public List<T> call() throws Exception
{ {
try { try {
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
return Sequences.toList(input.run(query), Lists.<T>newArrayList()); return Sequences.toList(input.run(query), Lists.<T>newArrayList());
} }
catch (Exception e) { catch (Exception e) {

View File

@ -31,7 +31,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes({ @JsonSubTypes({
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"), @JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query") @JsonSubTypes.Type(value = QueryDataSource.class, name = "query")
}) })
public interface DataSource public interface DataSource
{ {
public String getName();
} }

View File

@ -50,6 +50,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
if (period.getMillis() == 0) {
return baseRunner.run(query);
}
return Sequences.concat( return Sequences.concat(
FunctionalIterable FunctionalIterable
.create(query.getIntervals()) .create(query.getIntervals())

View File

@ -69,6 +69,11 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
final ServiceMetricEvent.Builder builder = builderFn.apply(query); final ServiceMetricEvent.Builder builder = builderFn.apply(query);
String queryId = query.getId();
if (queryId == null) {
queryId = "";
}
builder.setUser8(queryId);
return new Sequence<T>() return new Sequence<T>()
{ {

View File

@ -27,7 +27,7 @@ import org.joda.time.Period;
public class QueryConfig public class QueryConfig
{ {
@JsonProperty @JsonProperty
private Period chunkPeriod = Period.months(1); private Period chunkPeriod = new Period();
public Period getChunkPeriod() public Period getChunkPeriod()
{ {

View File

@ -37,6 +37,13 @@ public class QueryDataSource implements DataSource
this.query = query; this.query = query;
} }
@Override
public String getName()
{
return query.getDataSource().getName();
}
@JsonProperty
public Query getQuery() public Query getQuery()
{ {
return query; return query;
@ -47,12 +54,18 @@ public class QueryDataSource implements DataSource
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
QueryDataSource that = (QueryDataSource) o; QueryDataSource that = (QueryDataSource) o;
if (!query.equals(that.query)) return false; if (!query.equals(that.query)) {
return false;
}
return true; return true;
} }

View File

@ -36,6 +36,8 @@ public class TableDataSource implements DataSource
this.name = (name == null ? null : name.toLowerCase()); this.name = (name == null ? null : name.toLowerCase());
} }
@JsonProperty
@Override
public String getName() public String getName()
{ {
return name; return name;
@ -46,12 +48,18 @@ public class TableDataSource implements DataSource
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (!(o instanceof TableDataSource)) return false; return true;
}
if (!(o instanceof TableDataSource)) {
return false;
}
TableDataSource that = (TableDataSource) o; TableDataSource that = (TableDataSource) o;
if (!name.equals(that.name)) return false; if (!name.equals(that.name)) {
return false;
}
return true; return true;
} }

View File

@ -21,6 +21,7 @@ package io.druid.query.aggregation.post;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
@ -38,11 +39,13 @@ public class ConstantPostAggregator implements PostAggregator
@JsonCreator @JsonCreator
public ConstantPostAggregator( public ConstantPostAggregator(
@JsonProperty("name") String name, @JsonProperty("name") String name,
@JsonProperty("value") Number constantValue @JsonProperty("value") Number constantValue,
@JsonProperty("constantValue") Number backwardsCompatibleValue
) )
{ {
this.name = name; this.name = name;
this.constantValue = constantValue; this.constantValue = constantValue == null ? backwardsCompatibleValue : constantValue;
Preconditions.checkNotNull(this.constantValue);
} }
@Override @Override
@ -77,7 +80,7 @@ public class ConstantPostAggregator implements PostAggregator
return name; return name;
} }
@JsonProperty @JsonProperty("value")
public Number getConstantValue() public Number getConstantValue()
{ {
return constantValue; return constantValue;
@ -95,20 +98,26 @@ public class ConstantPostAggregator implements PostAggregator
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConstantPostAggregator that = (ConstantPostAggregator) o; ConstantPostAggregator that = (ConstantPostAggregator) o;
if (constantValue != null && that.constantValue != null) { if (constantValue != null && that.constantValue != null) {
if (constantValue.doubleValue() != that.constantValue.doubleValue()) if (constantValue.doubleValue() != that.constantValue.doubleValue()) {
return false; return false;
} }
else if (constantValue != that.constantValue) { } else if (constantValue != that.constantValue) {
return false; return false;
} }
if (name != null ? !name.equals(that.name) : that.name != null) return false; if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true; return true;
} }
@ -120,4 +129,5 @@ public class ConstantPostAggregator implements PostAggregator
result = 31 * result + (constantValue != null ? constantValue.hashCode() : 0); result = 31 * result + (constantValue != null ? constantValue.hashCode() : 0);
return result; return result;
} }
} }

View File

@ -169,8 +169,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
.setUser5(Joiner.on(",").join(query.getIntervals())) .setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())) .setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString());
.setUser10(query.getId());
} }
@Override @Override

View File

@ -151,8 +151,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
.setUser4(query.getType()) .setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals())) .setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString());
.setUser10(query.getId());
} }
@Override @Override

View File

@ -125,8 +125,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
.setUser4("search") .setUser4("search")
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString());
.setUser10(query.getId());
} }
@Override @Override

View File

@ -127,8 +127,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
.setUser4("Select") .setUser4("Select")
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString());
.setUser10(query.getId());
} }
@Override @Override

View File

@ -119,8 +119,7 @@ public class TimeBoundaryQueryQueryToolChest
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString()) .setUser2(query.getDataSource().toString())
.setUser4(query.getType()) .setUser4(query.getType())
.setUser6("false") .setUser6("false");
.setUser10(query.getId());
} }
@Override @Override

View File

@ -128,8 +128,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())) .setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString());
.setUser10(query.getId());
} }
@Override @Override

View File

@ -79,7 +79,17 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
) )
{ {
final TopNResultBuilder singleMetricResultBuilder = makeResultBuilder(params); final TopNResultBuilder singleMetricResultBuilder = makeResultBuilder(params);
final String metric = ((NumericTopNMetricSpec) query.getTopNMetricSpec()).getMetric(); final String metric;
// ugly
TopNMetricSpec spec = query.getTopNMetricSpec();
if (spec instanceof InvertedTopNMetricSpec
&& ((InvertedTopNMetricSpec) spec).getDelegate() instanceof NumericTopNMetricSpec) {
metric = ((NumericTopNMetricSpec) ((InvertedTopNMetricSpec) spec).getDelegate()).getMetric();
} else if (spec instanceof NumericTopNMetricSpec) {
metric = ((NumericTopNMetricSpec) query.getTopNMetricSpec()).getMetric();
} else {
throw new ISE("WTF?! We are in AggregateTopNMetricFirstAlgorithm with a [%s] spec", spec.getClass().getName());
}
// Find either the aggregator or post aggregator to do the topN over // Find either the aggregator or post aggregator to do the topN over
List<AggregatorFactory> condensedAggs = Lists.newArrayList(); List<AggregatorFactory> condensedAggs = Lists.newArrayList();

View File

@ -78,8 +78,6 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
while (numProcessed < cardinality) { while (numProcessed < cardinality) {
final int numToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed); final int numToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
params.getCursor().reset();
DimValSelector theDimValSelector; DimValSelector theDimValSelector;
if (!hasDimValSelector) { if (!hasDimValSelector) {
theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess); theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess);
@ -96,6 +94,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
closeAggregators(aggregatesStore); closeAggregators(aggregatesStore);
numProcessed += numToProcess; numProcessed += numToProcess;
params.getCursor().reset();
} }
} }

View File

@ -133,8 +133,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
.setUser5(COMMA_JOIN.join(query.getIntervals())) .setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters())) .setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())) .setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString()) .setUser9(Minutes.minutes(numMinutes).toString());
.setUser10(query.getId());
} }
@Override @Override

View File

@ -241,23 +241,22 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
if (numAdvanced == -1) { if (numAdvanced == -1) {
numAdvanced = 0; numAdvanced = 0;
} else {
Iterators.advance(baseIter, numAdvanced);
}
boolean foundMatched = false;
while (baseIter.hasNext()) { while (baseIter.hasNext()) {
currEntry.set(baseIter.next()); currEntry.set(baseIter.next());
if (filterMatcher.matches()) { if (filterMatcher.matches()) {
return; foundMatched = true;
break;
} }
numAdvanced++; numAdvanced++;
} }
} else {
Iterators.advance(baseIter, numAdvanced);
if (baseIter.hasNext()) {
currEntry.set(baseIter.next());
}
}
done = cursorMap.size() == 0 || !baseIter.hasNext();
done = !foundMatched && (cursorMap.size() == 0 || !baseIter.hasNext());
} }
@Override @Override

View File

@ -119,7 +119,7 @@ public class QueriesTest
"+", "+",
Arrays.asList( Arrays.asList(
new FieldAccessPostAggregator("idx", "idx"), new FieldAccessPostAggregator("idx", "idx"),
new ConstantPostAggregator("const", 1) new ConstantPostAggregator("const", 1, null)
) )
), ),
new ArithmeticPostAggregator( new ArithmeticPostAggregator(
@ -127,7 +127,7 @@ public class QueriesTest
"-", "-",
Arrays.asList( Arrays.asList(
new FieldAccessPostAggregator("rev", "rev"), new FieldAccessPostAggregator("rev", "rev"),
new ConstantPostAggregator("const", 1) new ConstantPostAggregator("const", 1, null)
) )
) )
) )
@ -173,7 +173,7 @@ public class QueriesTest
"+", "+",
Arrays.asList( Arrays.asList(
new FieldAccessPostAggregator("idx", "idx"), new FieldAccessPostAggregator("idx", "idx"),
new ConstantPostAggregator("const", 1) new ConstantPostAggregator("const", 1, null)
) )
), ),
new ArithmeticPostAggregator( new ArithmeticPostAggregator(
@ -181,7 +181,7 @@ public class QueriesTest
"-", "-",
Arrays.asList( Arrays.asList(
new FieldAccessPostAggregator("rev", "rev2"), new FieldAccessPostAggregator("rev", "rev2"),
new ConstantPostAggregator("const", 1) new ConstantPostAggregator("const", 1, null)
) )
) )
) )

View File

@ -67,7 +67,7 @@ public class QueryRunnerTestHelper
"uniques", "uniques",
"quality_uniques" "quality_uniques"
); );
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
public static final ArithmeticPostAggregator addRowsIndexConstant = public static final ArithmeticPostAggregator addRowsIndexConstant =

View File

@ -48,7 +48,7 @@ public class ArithmeticPostAggregatorTest
List<PostAggregator> postAggregatorList = List<PostAggregator> postAggregatorList =
Lists.newArrayList( Lists.newArrayList(
new ConstantPostAggregator( new ConstantPostAggregator(
"roku", 6 "roku", 6, null
), ),
new FieldAccessPostAggregator( new FieldAccessPostAggregator(
"rows", "rows" "rows", "rows"
@ -79,7 +79,7 @@ public class ArithmeticPostAggregatorTest
List<PostAggregator> postAggregatorList = List<PostAggregator> postAggregatorList =
Lists.newArrayList( Lists.newArrayList(
new ConstantPostAggregator( new ConstantPostAggregator(
"roku", 6 "roku", 6, null
), ),
new FieldAccessPostAggregator( new FieldAccessPostAggregator(
"rows", "rows" "rows", "rows"

View File

@ -19,6 +19,7 @@
package io.druid.query.aggregation.post; package io.druid.query.aggregation.post;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -33,11 +34,11 @@ public class ConstantPostAggregatorTest
{ {
ConstantPostAggregator constantPostAggregator; ConstantPostAggregator constantPostAggregator;
constantPostAggregator = new ConstantPostAggregator("shichi", 7); constantPostAggregator = new ConstantPostAggregator("shichi", 7, null);
Assert.assertEquals(7, constantPostAggregator.compute(null)); Assert.assertEquals(7, constantPostAggregator.compute(null));
constantPostAggregator = new ConstantPostAggregator("rei", 0.0); constantPostAggregator = new ConstantPostAggregator("rei", 0.0, null);
Assert.assertEquals(0.0, constantPostAggregator.compute(null)); Assert.assertEquals(0.0, constantPostAggregator.compute(null));
constantPostAggregator = new ConstantPostAggregator("ichi", 1.0); constantPostAggregator = new ConstantPostAggregator("ichi", 1.0, null);
Assert.assertNotSame(1, constantPostAggregator.compute(null)); Assert.assertNotSame(1, constantPostAggregator.compute(null));
} }
@ -45,10 +46,35 @@ public class ConstantPostAggregatorTest
public void testComparator() public void testComparator()
{ {
ConstantPostAggregator constantPostAggregator = ConstantPostAggregator constantPostAggregator =
new ConstantPostAggregator("thistestbasicallydoesnothing unhappyface", 1); new ConstantPostAggregator("thistestbasicallydoesnothing unhappyface", 1, null);
Comparator comp = constantPostAggregator.getComparator(); Comparator comp = constantPostAggregator.getComparator();
Assert.assertEquals(0, comp.compare(0, constantPostAggregator.compute(null))); Assert.assertEquals(0, comp.compare(0, constantPostAggregator.compute(null)));
Assert.assertEquals(0, comp.compare(0, 1)); Assert.assertEquals(0, comp.compare(0, 1));
Assert.assertEquals(0, comp.compare(1, 0)); Assert.assertEquals(0, comp.compare(1, 0));
} }
@Test
public void testSerdeBackwardsCompatible() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
ConstantPostAggregator aggregator = mapper.readValue(
"{\"type\":\"constant\",\"name\":\"thistestbasicallydoesnothing unhappyface\",\"constantValue\":1}\n",
ConstantPostAggregator.class
);
Assert.assertEquals(new Integer(1), aggregator.getConstantValue());
}
@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
ConstantPostAggregator aggregator = new ConstantPostAggregator("aggregator", 2, null);
ConstantPostAggregator aggregator1 = mapper.readValue(
mapper.writeValueAsString(aggregator),
ConstantPostAggregator.class
);
Assert.assertEquals(aggregator, aggregator1);
}
} }

View File

@ -43,7 +43,7 @@ public class TimeseriesBinaryFnTest
{ {
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator( final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator(

View File

@ -47,7 +47,7 @@ public class TopNBinaryFnTest
{ {
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addrowsindexconstant = new ArithmeticPostAggregator( final ArithmeticPostAggregator addrowsindexconstant = new ArithmeticPostAggregator(

View File

@ -22,6 +22,7 @@ package io.druid.segment.incremental;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
@ -30,6 +31,7 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -37,9 +39,15 @@ import io.druid.query.filter.DimFilters;
import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryEngine;
import junit.framework.Assert; import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryEngine;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.filter.SelectorFilter;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -115,6 +123,108 @@ public class IncrementalIndexStorageAdapterTest
Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent()); Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent());
} }
@Test
public void testResetSanity() {
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
DateTime t = DateTime.now();
Interval interval = new Interval(t.minusMinutes(1), t.plusMinutes(1));
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "hi")
)
);
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "bo")
)
);
IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index);
Iterable<Cursor> cursorIterable = adapter.makeCursors(new SelectorFilter("sally", "bo"),
interval,
QueryGranularity.NONE);
Cursor cursor = cursorIterable.iterator().next();
DimensionSelector dimSelector;
dimSelector = cursor.makeDimensionSelector("sally");
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "ah")
)
);
// Cursor reset should not be affected by out of order values
cursor.reset();
dimSelector = cursor.makeDimensionSelector("sally");
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
}
@Test
public void testSingleValueTopN()
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
DateTime t = DateTime.now();
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "bo")
)
);
TopNQueryEngine engine = new TopNQueryEngine(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(50000);
}
}
)
);
final Iterable<Result<TopNResultValue>> results = engine.query(
new TopNQueryBuilder().dataSource("test")
.granularity(QueryGranularity.ALL)
.intervals(Lists.newArrayList(new Interval(0, new DateTime().getMillis())))
.dimension("sally")
.metric("cnt")
.threshold(10)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
new LongSumAggregatorFactory(
"cnt",
"cnt"
)
)
)
.build(),
new IncrementalIndexStorageAdapter(index)
);
Assert.assertEquals(1, Iterables.size(results));
Assert.assertEquals(1, results.iterator().next().getValue().getValue().size());
}
@Test @Test
public void testFilterByNull() throws Exception public void testFilterByNull() throws Exception
{ {

Binary file not shown.

View File

@ -5,6 +5,7 @@
\setmainfont[Ligatures={TeX}]{Times} \setmainfont[Ligatures={TeX}]{Times}
\usepackage{hyperref} \usepackage{hyperref}
\graphicspath{{figures/}} \graphicspath{{figures/}}
\usepackage{enumitem}
\hyphenation{metamarkets nelson} \hyphenation{metamarkets nelson}
@ -75,7 +76,7 @@ came to the conclusion that there was nothing in the open source world that
could be fully leveraged for our requirements. could be fully leveraged for our requirements.
We ended up creating Druid, an open-source, distributed, column-oriented, We ended up creating Druid, an open-source, distributed, column-oriented,
realtime analytical data store. In many ways, Druid shares similarities with real-time analytical data store. In many ways, Druid shares similarities with
other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied}, other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
interactive query systems \cite{melnik2010dremel}, main-memory databases interactive query systems \cite{melnik2010dremel}, main-memory databases
\cite{farber2012sap}, and widely-known distributed data stores \cite{farber2012sap}, and widely-known distributed data stores
@ -96,10 +97,10 @@ Section \ref{sec:problem-definition}. Next, we detail system architecture from
the point of view of how data flows through the system in Section the point of view of how data flows through the system in Section
\ref{sec:architecture}. We then discuss how and why data gets converted into a \ref{sec:architecture}. We then discuss how and why data gets converted into a
binary format in Section \ref{sec:storage-format}. We briefly describe the binary format in Section \ref{sec:storage-format}. We briefly describe the
query API in Section \ref{sec:query-api} and present our experimental results query API in Section \ref{sec:query-api} and present performance results
in Section \ref{sec:benchmarks}. Lastly, we leave off with what we've learned from in Section \ref{sec:benchmarks}. Lastly, we leave off with our lessons from
running Druid in production in Section \ref{sec:production}, related work running Druid in production in Section \ref{sec:production}, and related work
in Section \ref{sec:related}, and conclusions in Section \ref{sec:conclusions}. in Section \ref{sec:related}.
\section{Problem Definition} \section{Problem Definition}
\label{sec:problem-definition} \label{sec:problem-definition}
@ -121,8 +122,6 @@ edit.
\begin{table*} \begin{table*}
\centering \centering
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
\label{tab:sample_data}
\begin{tabular}{| l | l | l | l | l | l | l | l |} \begin{tabular}{| l | l | l | l | l | l | l | l |}
\hline \hline
\textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline \textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline
@ -131,6 +130,8 @@ edit.
2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline 2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline
2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline 2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
\end{tabular} \end{tabular}
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
\label{tab:sample_data}
\end{table*} \end{table*}
Our goal is to rapidly compute drill-downs and aggregates over this data. We Our goal is to rapidly compute drill-downs and aggregates over this data. We
@ -159,7 +160,7 @@ determine business success or failure.
Finally, another key problem that Metamarkets faced in its early days was to Finally, another key problem that Metamarkets faced in its early days was to
allow users and alerting systems to be able to make business decisions in allow users and alerting systems to be able to make business decisions in
"real-time". The time from when an event is created to when that ``real-time". The time from when an event is created to when that
event is queryable determines how fast users and systems are able to react to event is queryable determines how fast users and systems are able to react to
potentially catastrophic occurrences in their systems. Popular open source data potentially catastrophic occurrences in their systems. Popular open source data
warehousing systems such as Hadoop were unable to provide the sub-second data ingestion warehousing systems such as Hadoop were unable to provide the sub-second data ingestion
@ -176,7 +177,7 @@ A Druid cluster consists of different types of nodes and each node type is
designed to perform a specific set of things. We believe this design separates designed to perform a specific set of things. We believe this design separates
concerns and simplifies the complexity of the system. The different node types concerns and simplifies the complexity of the system. The different node types
operate fairly independent of each other and there is minimal interaction operate fairly independent of each other and there is minimal interaction
between them. Hence, intra-cluster communication failures have minimal impact among them. Hence, intra-cluster communication failures have minimal impact
on data availability. To solve complex data analysis problems, the different on data availability. To solve complex data analysis problems, the different
node types come together to form a fully working system. The name Druid comes node types come together to form a fully working system. The name Druid comes
from the Druid class in many role-playing games: it is a shape-shifter, capable from the Druid class in many role-playing games: it is a shape-shifter, capable
@ -206,7 +207,7 @@ Zookeeper.
Real-time nodes maintain an in-memory index buffer for all incoming events. Real-time nodes maintain an in-memory index buffer for all incoming events.
These indexes are incrementally populated as new events are ingested and the These indexes are incrementally populated as new events are ingested and the
indexes are also directly queryable. Druid virtually behaves as a row store indexes are also directly queryable. Druid behaves as a row store
for queries on events that exist in this JVM heap-based buffer. To avoid heap for queries on events that exist in this JVM heap-based buffer. To avoid heap
overflow problems, real-time nodes persist their in-memory indexes to disk overflow problems, real-time nodes persist their in-memory indexes to disk
either periodically or after some maximum row limit is reached. This persist either periodically or after some maximum row limit is reached. This persist
@ -218,10 +219,10 @@ in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}.
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{realtime_flow} \includegraphics[width = 2.6in]{realtime_flow}
\caption{Real-time nodes first buffer events in memory. On a periodic basis, \caption{Real-time nodes first buffer events in memory. On a periodic basis,
the in-memory index is persisted to disk. On another periodic basis, all the in-memory index is persisted to disk. On another periodic basis, all
persisted indexes are merged together and handed off. Queries for data will hit the persisted indexes are merged together and handed off. Queries will hit the
in-memory index and the persisted indexes.} in-memory index and the persisted indexes.}
\label{fig:realtime_flow} \label{fig:realtime_flow}
\end{figure} \end{figure}
@ -230,38 +231,36 @@ On a periodic basis, each real-time node will schedule a background task that
searches for all locally persisted indexes. The task merges these indexes searches for all locally persisted indexes. The task merges these indexes
together and builds an immutable block of data that contains all the events together and builds an immutable block of data that contains all the events
that have ingested by a real-time node for some span of time. We refer to this that have ingested by a real-time node for some span of time. We refer to this
block of data as a "segment". During the handoff stage, a real-time node block of data as a ``segment". During the handoff stage, a real-time node
uploads this segment to a permanent backup storage, typically a distributed uploads this segment to a permanent backup storage, typically a distributed
file system such as S3 \cite{decandia2007dynamo} or HDFS file system such as S3 \cite{decandia2007dynamo} or HDFS
\cite{shvachko2010hadoop}, which Druid refers to as "deep storage". The ingest, \cite{shvachko2010hadoop}, which Druid refers to as ``deep storage". The ingest,
persist, merge, and handoff steps are fluid; there is no data loss during any persist, merge, and handoff steps are fluid; there is no data loss during any
of the processes. of the processes.
To better understand the flow of data through a real-time node, consider the Figure~\ref{fig:realtime_timeline} illustrates the operations of a real-time
following example. First, we start a real-time node at 13:37. The node will node. The node starts at 13:37 and will only accept events for the current hour
only accept events for the current hour or the next hour. When the node begins or the next hour. When events are ingested, the node announces that it is
ingesting events, it will announce that it is serving a segment of data for a serving a segment of data for an interval from 13:00 to 14:00. Every 10
time window from 13:00 to 14:00. Every 10 minutes (the persist period is minutes (the persist period is configurable), the node will flush and persist
configurable), the node will flush and persist its in-memory buffer to disk. its in-memory buffer to disk. Near the end of the hour, the node will likely
Near the end of the hour, the node will likely see events with timestamps from see events for 14:00 to 15:00. When this occurs, the node prepares to serve
14:00 to 15:00. When this occurs, the node prepares to serve data for the next data for the next hour and creates a new in-memory index. The node then
hour and creates a new in-memory index. The node then announces that it is also announces that it is also serving a segment from 14:00 to 15:00. The node does
serving a segment for data from 14:00 to 15:00. The node does not immediately not immediately merge persisted indexes from 13:00 to 14:00, instead it waits
merge the indexes it persisted from 13:00 to 14:00, instead it waits for a for a configurable window period for straggling events from 13:00 to 14:00 to
configurable window period for straggling events from 13:00 to 14:00 to come arrive. This window period minimizes the risk of data loss from delays in event
in. Having a window period minimizes the risk of data loss from delays in event delivery. At the end of the window period, the node merges all persisted
delivery. At the end of the window period, the real-time node merges all indexes from 13:00 to 14:00 into a single immutable segment and hands the
persisted indexes from 13:00 to 14:00 into a single immutable segment and hands segment off. Once this segment is loaded and queryable somewhere else in the
the segment off. Once this segment is loaded and queryable somewhere else in Druid cluster, the real-time node flushes all information about the data it
the Druid cluster, the real-time node flushes all information about the data it collected for 13:00 to 14:00 and unannounces it is serving this data.
collected for 13:00 to 14:00 and unannounces it is serving this data. This
process is shown in Figure~\ref{fig:realtime_timeline}.
\begin{figure*} \begin{figure*}
\centering \centering
\includegraphics[width = 4.5in]{realtime_timeline} \includegraphics[width = 4.5in]{realtime_timeline}
\caption{The node starts, ingests data, persists, and periodically hands data \caption{The node starts, ingests data, persists, and periodically hands data
off. This process repeats indefinitely. The time intervals between different off. This process repeats indefinitely. The time periods between different
real-time node operations are configurable.} real-time node operations are configurable.}
\label{fig:realtime_timeline} \label{fig:realtime_timeline}
\end{figure*} \end{figure*}
@ -284,26 +283,26 @@ milliseconds.
The purpose of the message bus in Figure~\ref{fig:realtime_pipeline} is The purpose of the message bus in Figure~\ref{fig:realtime_pipeline} is
two-fold. First, the message bus acts as a buffer for incoming events. A two-fold. First, the message bus acts as a buffer for incoming events. A
message bus such as Kafka maintains offsets indicating the position in an event message bus such as Kafka maintains positional offsets indicating how far a
stream that a consumer (a real-time node) has read up to and consumers can consumer (a real-time node) has read in an event stream. Consumers can
programmatically update these offsets. Typically, real-time nodes update this programmatically update these offsets. Real-time nodes update this offset each
offset each time they persist their in-memory buffers to disk. In a fail and time they persist their in-memory buffers to disk. In a fail and recover
recover scenario, if a node has not lost disk, it can reload all persisted scenario, if a node has not lost disk, it can reload all persisted indexes from
indexes from disk and continue reading events from the last offset it disk and continue reading events from the last offset it committed. Ingesting
committed. Ingesting events from a recently committed offset greatly reduces a events from a recently committed offset greatly reduces a node's recovery time.
node's recovery time. In practice, we see real-time nodes recover from such In practice, we see nodes recover from such failure scenarios in a
failure scenarios in an order of seconds. few seconds.
The second purpose of the message bus is to act as a single endpoint from which The second purpose of the message bus is to act as a single endpoint from which
multiple real-time nodes can read events. Multiple real-time nodes can ingest multiple real-time nodes can read events. Multiple real-time nodes can ingest
the same set of events from the bus, thus creating a replication of events. In the same set of events from the bus, creating a replication of events. In a
a scenario where a node completely fails and does not recover, replicated scenario where a node completely fails and loses disk, replicated streams
streams ensure that no data is lost. A single ingestion endpoint also allows ensure that no data is lost. A single ingestion endpoint also allows for data
for data streams for be partitioned such that multiple real-time nodes each streams for be partitioned such that multiple real-time nodes each ingest a
ingest a portion of a stream. This allows additional real-time nodes to be portion of a stream. This allows additional real-time nodes to be seamlessly
seamlessly added. In practice, this model has allowed one of the largest added. In practice, this model has allowed one of the largest production Druid
production Druid clusters to be able to consume raw data at approximately 500 clusters to be able to consume raw data at approximately 500 MB/s (150,000
MB/s (150,000 events/s or 2 TB/hour). events/s or 2 TB/hour).
\subsection{Historical Nodes} \subsection{Historical Nodes}
Historical nodes encapsulate the functionality to load and serve the immutable Historical nodes encapsulate the functionality to load and serve the immutable
@ -414,7 +413,7 @@ distribution on historical nodes. The coordinator nodes tell historical nodes
to load new data, drop outdated data, replicate data, and move data to load to load new data, drop outdated data, replicate data, and move data to load
balance. Druid uses a multi-version concurrency control swapping protocol for balance. Druid uses a multi-version concurrency control swapping protocol for
managing immutable segments in order to maintain stable views. If any managing immutable segments in order to maintain stable views. If any
immutable segment contains data that is wholly obseleted by newer segments, the immutable segment contains data that is wholly obsoleted by newer segments, the
outdated segment is dropped from the cluster. Coordinator nodes undergo a outdated segment is dropped from the cluster. Coordinator nodes undergo a
leader-election process that determines a single node that runs the coordinator leader-election process that determines a single node that runs the coordinator
functionality. The remaining coordinator nodes act as redundant backups. functionality. The remaining coordinator nodes act as redundant backups.
@ -437,8 +436,8 @@ Rules indicate how segments should be assigned to different historical node
tiers and how many replicates of a segment should exist in each tier. Rules may tiers and how many replicates of a segment should exist in each tier. Rules may
also indicate when segments should be dropped entirely from the cluster. Rules also indicate when segments should be dropped entirely from the cluster. Rules
are usually set for a period of time. For example, a user may use rules to are usually set for a period of time. For example, a user may use rules to
load the most recent one month's worth of segments into a "hot" cluster, the load the most recent one month's worth of segments into a ``hot" cluster, the
most recent one year's worth of segments into a "cold" cluster, and drop any most recent one year's worth of segments into a ``cold" cluster, and drop any
segments that are older. segments that are older.
The coordinator nodes load a set of rules from a rule table in the MySQL The coordinator nodes load a set of rules from a rule table in the MySQL
@ -506,9 +505,7 @@ Druid always requires a timestamp column as a method of simplifying data
distribution policies, data retention policies, and first-level query pruning. distribution policies, data retention policies, and first-level query pruning.
Druid partitions its data sources into well-defined time intervals, typically Druid partitions its data sources into well-defined time intervals, typically
an hour or a day, and may further partition on values from other columns to an hour or a day, and may further partition on values from other columns to
achieve the desired segment size. For example, partitioning the data in achieve the desired segment size. The time granularity
Table~\ref{tab:sample_data} by hour results in two segments for 2011-01-01, and
partitioning the data by day results in a single segment. The time granularity
to partition segments is a function of data volume and time range. A data set to partition segments is a function of data volume and time range. A data set
with timestamps spread over a year is better partitioned by day, and a data set with timestamps spread over a year is better partitioned by day, and a data set
with timestamps spread over a day is better partitioned by hour. with timestamps spread over a day is better partitioned by hour.
@ -541,17 +538,17 @@ method to compress data and has been used in other data stores such as
PowerDrill \cite{hall2012processing}. In the example in PowerDrill \cite{hall2012processing}. In the example in
Table~\ref{tab:sample_data}, we can map each page to an unique integer Table~\ref{tab:sample_data}, we can map each page to an unique integer
identifier. identifier.
\begin{verbatim} {\small\begin{verbatim}
Justin Bieber -> 0 Justin Bieber -> 0
Ke$ha -> 1 Ke$ha -> 1
\end{verbatim} \end{verbatim}}
This mapping allows us to represent the page column as an integer This mapping allows us to represent the page column as an integer
array where the array indices correspond to the rows of the original array where the array indices correspond to the rows of the original
data set. For the page column, we can represent the unique data set. For the page column, we can represent the unique
pages as follows: pages as follows:
\begin{verbatim} {\small\begin{verbatim}
[0, 0, 1, 1] [0, 0, 1, 1]
\end{verbatim} \end{verbatim}}
The resulting integer array lends itself very well to The resulting integer array lends itself very well to
compression methods. Generic compression algorithms on top of encodings are compression methods. Generic compression algorithms on top of encodings are
@ -562,17 +559,17 @@ Similar compression methods can be applied to numeric
columns. For example, the characters added and characters removed columns in columns. For example, the characters added and characters removed columns in
Table~\ref{tab:sample_data} can also be expressed as individual Table~\ref{tab:sample_data} can also be expressed as individual
arrays. arrays.
\begin{verbatim} {\small\begin{verbatim}
Characters Added -> [1800, 2912, 1953, 3194] Characters Added -> [1800, 2912, 1953, 3194]
Characters Removed -> [25, 42, 17, 170] Characters Removed -> [25, 42, 17, 170]
\end{verbatim} \end{verbatim}}
In this case, we compress the raw values as opposed to their dictionary In this case, we compress the raw values as opposed to their dictionary
representations. representations.
\subsection{Indices for Filtering Data} \subsection{Indices for Filtering Data}
In many real world OLAP workflows, queries are issued for the aggregated In many real world OLAP workflows, queries are issued for the aggregated
results of some set of metrics where some set of dimension specifications are results of some set of metrics where some set of dimension specifications are
met. An example query is: "How many Wikipedia edits were done by users in met. An example query is: ``How many Wikipedia edits were done by users in
San Francisco who are also male?". This query is filtering the Wikipedia data San Francisco who are also male?". This query is filtering the Wikipedia data
set in Table~\ref{tab:sample_data} based on a Boolean expression of dimension set in Table~\ref{tab:sample_data} based on a Boolean expression of dimension
values. In many real world data sets, dimension columns contain strings and values. In many real world data sets, dimension columns contain strings and
@ -587,22 +584,22 @@ indicating in which table rows a particular page is seen. We can
store this information in a binary array where the array indices store this information in a binary array where the array indices
represent our rows. If a particular page is seen in a certain represent our rows. If a particular page is seen in a certain
row, that array index is marked as \texttt{1}. For example: row, that array index is marked as \texttt{1}. For example:
\begin{verbatim} {\small\begin{verbatim}
Justin Bieber -> rows [0, 1] -> [1][1][0][0] Justin Bieber -> rows [0, 1] -> [1][1][0][0]
Ke$ha -> rows [2, 3] -> [0][0][1][1] Ke$ha -> rows [2, 3] -> [0][0][1][1]
\end{verbatim} \end{verbatim}}
\texttt{Justin Bieber} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values \texttt{Justin Bieber} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values
to row indices forms an inverted index \cite{tomasic1993performance}. To know which to row indices forms an inverted index \cite{tomasic1993performance}. To know which
rows contain {\ttfamily Justin Bieber} or {\ttfamily Ke\$ha}, we can \texttt{OR} together rows contain {\ttfamily Justin Bieber} or {\ttfamily Ke\$ha}, we can \texttt{OR} together
the two arrays. the two arrays.
\begin{verbatim} {\small\begin{verbatim}
[0][1][0][1] OR [1][0][1][0] = [1][1][1][1] [0][1][0][1] OR [1][0][1][0] = [1][1][1][1]
\end{verbatim} \end{verbatim}}
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 3in]{concise_plot} \includegraphics[width = 2.8in]{concise_plot}
\caption{Integer array size versus Concise set size.} \caption{Integer array size versus Concise set size.}
\label{fig:concise_plot} \label{fig:concise_plot}
\end{figure} \end{figure}
@ -610,31 +607,23 @@ the two arrays.
This approach of performing Boolean operations on large bitmap sets is commonly This approach of performing Boolean operations on large bitmap sets is commonly
used in search engines. Bitmap indices for OLAP workloads is described in used in search engines. Bitmap indices for OLAP workloads is described in
detail in \cite{o1997improved}. Bitmap compression algorithms are a detail in \cite{o1997improved}. Bitmap compression algorithms are a
well-defined area of research and often utilize run-length encoding. Popular well-defined area of research \cite{antoshenkov1995byte, wu2006optimizing,
algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte}, van2011memory} and often utilize run-length encoding. Druid opted to use the
Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned Concise algorithm \cite{colantonio2010concise}. Figure~\ref{fig:concise_plot}
Word-Aligned Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use illustrates the number of bytes using Concise compression versus using an
the Concise algorithm \cite{colantonio2010concise} as it can outperform WAH by integer array. The results were generated on a \texttt{cc2.8xlarge} system with
reducing the size of the compressed bitmaps by up to 50\%. a single thread, 2G heap, 512m young gen, and a forced GC between each run. The
Figure~\ref{fig:concise_plot} illustrates the number of bytes using Concise data set is a single days worth of data collected from the Twitter garden hose
compression versus using an integer array. The results were generated on a \cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12
cc2.8xlarge system with a single thread, 2G heap, 512m young gen, and a forced dimensions of varying cardinality. As an additional comparison, we also
GC between each run. The data set is a single days worth of data collected resorted the data set rows to maximize compression.
from the Twitter garden hose \cite{twitter2013} data stream. The data set
contains 2,272,295 rows and 12 dimensions of varying cardinality. As an
additional comparison, we also resorted the data set rows to maximize
compression.
In the unsorted case, the total Concise size was 53,451,144 bytes and the total In the unsorted case, the total Concise size was 53,451,144 bytes and the total
integer array size was 127,248,520 bytes. Overall, Concise compressed sets are integer array size was 127,248,520 bytes. Overall, Concise compressed sets are
about 42\% smaller than integer arrays. In the sorted case, the total Concise about 42\% smaller than integer arrays. In the sorted case, the total Concise
compressed size was 43,832,884 bytes and the total integer array size was compressed size was 43,832,884 bytes and the total integer array size was
127,248,520 bytes. What is interesting to note is that after sorting, global 127,248,520 bytes. What is interesting to note is that after sorting, global
compression only increased minimally. The total Concise set size to total compression only increased minimally.
integer array size is 34\%. It is also interesting to note that as the
cardinality of a dimension approaches the total number of rows in a data set,
integer arrays require less space than Concise sets and become a better
alternative.
\subsection{Storage Engine} \subsection{Storage Engine}
Druids persistence components allows for different storage engines to be Druids persistence components allows for different storage engines to be
@ -675,8 +664,7 @@ into data at any depth.
The exact query syntax depends on the query type and the information requested. The exact query syntax depends on the query type and the information requested.
A sample count query over a week of data is as follows: A sample count query over a week of data is as follows:
\newpage {\scriptsize\begin{verbatim}
\begin{verbatim}
{ {
"queryType" : "timeseries", "queryType" : "timeseries",
"dataSource" : "wikipedia", "dataSource" : "wikipedia",
@ -687,75 +675,93 @@ A sample count query over a week of data is as follows:
"value" : "Ke$ha" "value" : "Ke$ha"
}, },
"granularity" : "day", "granularity" : "day",
"aggregations" : [ { "aggregations" : [{"type":"count", "name":"rows"}]
"type" : "count",
"name" : "rows"
} ]
} }
\end{verbatim} \end{verbatim}}
The query shown above will return a count of the number of rows in the Wikipedia datasource The query shown above will return a count of the number of rows in the Wikipedia data source
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is
equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form: equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
\begin{verbatim} {\scriptsize\begin{verbatim}
[ { [ {
"timestamp": "2012-01-01T00:00:00.000Z", "timestamp": "2012-01-01T00:00:00.000Z",
"result": { "result": {"rows":393298}
"rows": 393298
}
}, },
{ {
"timestamp": "2012-01-02T00:00:00.000Z", "timestamp": "2012-01-02T00:00:00.000Z",
"result": { "result": {"rows":382932}
"rows": 382932
}
}, },
... ...
{ {
"timestamp": "2012-01-07T00:00:00.000Z", "timestamp": "2012-01-07T00:00:00.000Z",
"result": { "result": {"rows": 1337}
"rows": 1337
}
} ] } ]
\end{verbatim} \end{verbatim}}
Druid supports many types of aggregations including double sums, long sums, Druid supports many types of aggregations including double sums, long sums,
minimums, maximums, and several others. Druid also supports complex aggregations minimums, maximums, and complex aggregations such as cardinality estimation and
such as cardinality estimation and approximate quantile estimation. The approximate quantile estimation. The results of aggregations can be combined
results of aggregations can be combined in mathematical expressions to form in mathematical expressions to form other aggregations. It is beyond the scope
other aggregations. The query API is highly customizable and can be extended to of this paper to fully describe the query API but more information can be found
filter and group results based on almost any arbitrary condition. It is beyond
the scope of this paper to fully describe the query API but more information
can be found
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}. online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.
At the time of writing, the query language does not support joins. Although the As of this writing, a join query for Druid is not yet implemented. This has
storage format is able to support joins, we've targeted Druid at user-facing been a function of engineering resource allocation and use case decisions more
workloads that must return in a matter of seconds, and as such, we've chosen to than a decision driven by technical merit. Indeed, Druid's storage format
not spend the time to implement joins as it has been our experience that would allow for the implementation of joins (there is no loss of fidelity for
requiring joins on your queries often limits the performance you can achieve. columns included as dimensions) and the implementation of them has been a
conversation that we have every few months. To date, we have made the choice
that the implementation cost is not worth the investment for our organization.
The reasons for this decision are generally two-fold.
\begin{enumerate}
\item Scaling join queries has been, in our professional experience, a constant bottleneck of working with distributed databases.
\item The incremental gains in functionality are perceived to be of less value than the anticipated problems with managing highly concurrent, join-heavy workloads.
\end{enumerate}
A join query is essentially the merging of two or more streams of data based on
a shared set of keys. The primary high-level strategies for join queries the
authors are aware of are a hash-based strategy or a sorted-merge strategy. The
hash-based strategy requires that all but one data set be available as
something that looks like a hash table, a lookup operation is then performed on
this hash table for every row in the ``primary" stream. The sorted-merge
strategy assumes that each stream is sorted by the join key and thus allows for
the incremental joining of the streams. Each of these strategies, however,
requires the materialization of some number of the streams either in sorted
order or in a hash table form.
When all sides of the join are significantly large tables (> 1 billion records),
materializing the pre-join streams requires complex distributed memory
management. The complexity of the memory management is only amplified by
the fact that we are targeting highly concurrent, multitenant workloads.
This is, as far as the authors are aware, an active academic research
problem that we would be more than willing to engage with the academic
community to help resolving in a scalable manner.
\newpage
\section{Performance} \section{Performance}
\label{sec:benchmarks} \label{sec:benchmarks}
Druid runs in production at several organizations, and to demonstrate its Druid runs in production at several organizations, and to demonstrate its
performance, we've chosen to share some real world numbers of the production performance, we have chosen to share some real world numbers for the main production
cluster at Metamarkets. The date range of the data is for Feburary 2014. cluster running at Metamarkets in early 2014. For comparison with other databases
we also include results from synthetic workloads on TPC-H data.
\subsection{Query Performance} \subsection{Query Performance in Production}
Druid query performance can vary signficantly depending on the actual query Druid query performance can vary signficantly depending on the query
being issued. For example, determining the approximate cardinality of a given being issued. For example, sorting the values of a high cardinality dimension
dimension is a much more expensive operation than a simple sum of a metric based on a given metric is much more expensive than a simple count over a time
column. Similarily, sorting the values of a high cardinality dimension based on range. To showcase the average query latencies in a production Druid cluster,
a given metric is much more expensive than a simple count over a time range. we selected 8 of our most queried data sources, described in Table~\ref{tab:datasources}.
Furthermore, the time range of a query and the number of metric aggregators in
the query will contribute to query latency. Instead of going into full detail Approximately 30\% of the queries are standard
about every query issued in our production cluster, we've instead chosen to aggregates involving different types of metrics and filters, 60\% of queries
showcase a higher level view of average latencies in our cluster. We selected 8 are ordered group bys over one or more dimensions with aggregates, and 10\% of
of our most queried data sources, described in Table~\ref{tab:datasources}. queries are search queries and metadata retrieval queries. The number of
columns scanned in aggregate queries roughly follows an exponential
distribution. Queries involving a single column are very frequent, and queries
involving all columns are very rare.
\begin{table} \begin{table}
\centering \centering
\caption{Dimensions and metrics of the 8 most queried Druid data sources in production.}
\label{tab:datasources}
\begin{tabular}{| l | l | l |} \begin{tabular}{| l | l | l |}
\hline \hline
\textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline \textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline
@ -768,121 +774,131 @@ of our most queried data sources, described in Table~\ref{tab:datasources}.
\texttt{g} & 26 & 18 \\ \hline \texttt{g} & 26 & 18 \\ \hline
\texttt{h} & 78 & 14 \\ \hline \texttt{h} & 78 & 14 \\ \hline
\end{tabular} \end{tabular}
\caption{Characteristics of production data sources.}
\label{tab:datasources}
\end{table} \end{table}
Some more details of our results: A few notes about our results:
\begin{itemize}[leftmargin=*,beginpenalty=5000,topsep=0pt]
\item The results are from a ``hot" tier in our production cluster. There were
approximately 50 data sources in the tier and several hundred users issuing
queries.
\begin{itemize} \item There was approximately 10.5TB of RAM available in the ``hot" tier and
\item The results are from a "hot" tier in our production cluster. We run several tiers of varying performance in production. approximately 10TB of segments loaded. Collectively,
\item There is approximately 10.5TB of RAM available in the "hot" tier and approximately 10TB of segments loaded (including replication). Collectively, there are about 50 billion Druid rows in this tier. Results for every data source are not shown. there are about 50 billion Druid rows in this tier. Results for
\item The hot tier uses Xeon E5-2670 processors and consists of 1302 processing threads and 672 total cores (hyperthreaded). every data source are not shown.
\item A memory-mapped storage engine was used (the machine was configured to memory map the data
instead of loading it into the Java heap.) \item The hot tier uses Xeon E5-2670 processors and consists of 1302 processing
threads and 672 total cores (hyperthreaded).
\item A memory-mapped storage engine was used (the machine was configured to
memory map the data instead of loading it into the Java heap.)
\end{itemize} \end{itemize}
The average query latency is shown in Figure~\ref{fig:avg_query_latency} and Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per
the queries per minute is shown in Figure~\ref{fig:queries_per_min}. We can see minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various
that across the various data sources, the average query latency is approximately data sources, average query latency is approximately 550 milliseconds, with
540ms. The 90th percentile query latency across these data sources is < 1s, the 90\% of queries returning in less than 1 second, 95\% in under 2 seconds, and
95th percentile is < 2s, and the 99th percentile is < 10s. The percentiles are 99\% of queries returning in less than 10 seconds. Occasionally we observe
shown in Figure~\ref{fig:query_percentiles}. It is very possible to possible to spikes in latency, as observed on February 19, in which case network issues on
decrease query latencies by adding additional hardware, but we have not chosen the Memcached instances were compounded by very high query load on one of our
to do so because infrastructure cost is still a consideration. largest datasources.
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{avg_query_latency} \includegraphics[width = 2.3in]{avg_query_latency}
\caption{Druid production cluster average query latencies for multiple data sources.} \includegraphics[width = 2.3in]{query_percentiles}
\label{fig:avg_query_latency} \caption{Query latencies of production data sources.}
\label{fig:query_latency}
\end{figure} \end{figure}
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{queries_per_min} \includegraphics[width = 2.8in]{queries_per_min}
\caption{Druid production cluster queries per minute for multiple data sources.} \caption{Queries per minute of production data sources.}
\label{fig:queries_per_min} \label{fig:queries_per_min}
\end{figure} \end{figure}
\begin{figure} \subsection{Query Benchmarks on TPC-H Data}
\centering We also present Druid benchmarks on TPC-H data.
\includegraphics[width = 2.8in]{query_percentiles} Most TPC-H queries do not directly apply to Druid, so we
\caption{Druid production cluster 90th, 95th, and 99th query latency percentiles for the 8 most queried data sources.} selected queries more typical of Druid's workload to demonstrate query performance. As a
\label{fig:query_percentiles} comparison, we also provide the results of the same queries using MySQL using the
\end{figure} MyISAM engine (InnoDB was slower in our experiments).
We also present Druid benchmarks with TPC-H data. Our setup used Amazon EC2 We selected MySQL to benchmark
m3.2xlarge (CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for
historical nodes. Most TPC-H queries do not directly apply to Druid, so we
selected similiar queries to demonstrate Druid's query performance. As a
comparison, we also provide the results of the same queries using MySQL with
MyISAM (InnoDB was slower in our experiments). Our MySQL setup was an Amazon
RDS instance that also ran on an m3.2xlarge node.We selected MySQL to benchmark
against because of its universal popularity. We choose not to select another against because of its universal popularity. We choose not to select another
open source column store because we were not confident we could correctly tune open source column store because we were not confident we could correctly tune
it for optimal performance. The results for the 1 GB TPC-H data set are shown it for optimal performance.
Our Druid setup used Amazon EC2
\texttt{m3.2xlarge} (Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for
historical nodes and \texttt{c3.2xlarge} (Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz) instances for broker
nodes. Our MySQL setup was an Amazon RDS instance that ran on the same \texttt{m3.2xlarge} instance type.
The results for the 1 GB TPC-H data set are shown
in Figure~\ref{fig:tpch_1gb} and the results of the 100 GB data set are shown in Figure~\ref{fig:tpch_1gb} and the results of the 100 GB data set are shown
in Figure~\ref{fig:tpch_100gb}. We benchmarked Druid's scan rate at in Figure~\ref{fig:tpch_100gb}. We benchmarked Druid's scan rate at
53,539,211.1 rows/second/core for count(*) over a given interval and 36,246,530 53,539,211 rows/second/core for \texttt{select count(*)} equivalent query over a given time interval
rows/second/core for an aggregation involving floats. and 36,246,530 rows/second/core for a \texttt{select sum(float)} type query.
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{tpch_1gb} \includegraphics[width = 2.3in]{tpch_1gb}
\caption{Druid and MySQL (MyISAM) benchmarks with the TPC-H 1 GB data set.} \caption{Druid \& MySQL benchmarks -- 1GB TPC-H data.}
\label{fig:tpch_1gb} \label{fig:tpch_1gb}
\end{figure} \end{figure}
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{tpch_100gb} \includegraphics[width = 2.3in]{tpch_100gb}
\caption{Druid and MySQL (MyISAM) benchmarks with the TPC-H 100 GB data set.} \caption{Druid \& MySQL benchmarks -- 100GB TPC-H data.}
\label{fig:tpch_100gb} \label{fig:tpch_100gb}
\end{figure} \end{figure}
Finally, we present our results of scaling Druid to meet increasing data Finally, we present our results of scaling Druid to meet increasing data
volumes with the TPC-H 100 GB data set. Our distributed cluster used Amazon EC2 volumes with the TPC-H 100 GB data set. We observe that when we
c3.2xlarge (Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz) instances for broker increased the number of cores from 8 to 48, not all types of queries
nodes. We observe that when we increased the number of cores from 8 to 48, we achieve linear scaling, but the simpler aggregation queries do,
do not always display linear scaling. The increase in speed of a parallel as shown in Figure~\ref{fig:tpch_scaling}.
computing system is often limited by the time needed for the sequential
operations of the system, in accordance with Amdahl's law The increase in speed of a parallel computing system is often limited by the
\cite{amdahl1967validity}. Our query results and query speedup are shown in time needed for the sequential operations of the system. In this case, queries
Figure~\ref{fig:tpch_scaling}. requiring a substantial amount of work at the broker level do not parallelize as
well.
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{tpch_scaling} \includegraphics[width = 2.3in]{tpch_scaling}
\caption{Scaling a Druid cluster with the TPC-H 100 GB data set.} \caption{Druid scaling benchmarks -- 100GB TPC-H data.}
\label{fig:tpch_scaling} \label{fig:tpch_scaling}
\end{figure} \end{figure}
\subsection{Data Ingestion Performance} \subsection{Data Ingestion Performance}
To showcase Druid's data ingestion latency, we selected several production To showcase Druid's data ingestion latency, we selected several production
datasources of varying dimensions, metrics, and event volumes. Our production datasources of varying dimensions, metrics, and event volumes. Our production
ingestion setup is as follows: ingestion setup consists of 6 nodes, totalling 360GB of RAM and 96 cores
(12 x Intel Xeon E5-2670).
\begin{itemize} Note that in this setup, several other data sources were being ingested and
\item Total RAM: 360 GB many other Druid related ingestion tasks were running concurrently on the machines.
\item Total CPU: 12 x Intel Xeon E5-2670 (96 cores)
\item Note: In this setup, several other data sources were being ingested and many other Druid related ingestion tasks were running across these machines.
\end{itemize}
Druid's data ingestion latency is heavily dependent on the complexity of the Druid's data ingestion latency is heavily dependent on the complexity of the
data set being ingested. The data complexity is determined by the number of data set being ingested. The data complexity is determined by the number of
dimensions in each event, the number of metrics in each event, and the types of dimensions in each event, the number of metrics in each event, and the types of
aggregations we want to perform on those metrics. With the most basic data set aggregations we want to perform on those metrics. With the most basic data set
(one that only has a timestamp column), our setup can ingest data at a rate of (one that only has a timestamp column), our setup can ingest data at a rate of
800,000 events/sec/core, which is really just a measurement of how fast we can 800,000 events/second/core, which is really just a measurement of how fast we can
deserialize events. Real world data sets are never this simple. A description deserialize events. Real world data sets are never this simple.
of the data sources we selected is shown in Table~\ref{tab:ingest_datasources}. Table~\ref{tab:ingest_datasources} shows a selection of data sources and their
characteristics.
\begin{table} \begin{table}
\centering \centering
\caption{Dimensions, metrics, and peak throughputs of various ingested data sources.}
\label{tab:ingest_datasources}
\begin{tabular}{| l | l | l | l |} \begin{tabular}{| l | l | l | l |}
\hline \hline
\textbf{Data Source} & \textbf{Dims} & \textbf{Mets} & \textbf{Peak Throughput (events/sec)} \\ \hline \scriptsize\textbf{Data Source} & \scriptsize\textbf{Dimensions} & \scriptsize\textbf{Metrics} & \scriptsize\textbf{Peak events/s} \\ \hline
\texttt{s} & 7 & 2 & 28334.60 \\ \hline \texttt{s} & 7 & 2 & 28334.60 \\ \hline
\texttt{t} & 10 & 7 & 68808.70 \\ \hline \texttt{t} & 10 & 7 & 68808.70 \\ \hline
\texttt{u} & 5 & 1 & 49933.93 \\ \hline \texttt{u} & 5 & 1 & 49933.93 \\ \hline
@ -892,62 +908,79 @@ of the data sources we selected is shown in Table~\ref{tab:ingest_datasources}.
\texttt{y} & 33 & 24 & 162462.41 \\ \hline \texttt{y} & 33 & 24 & 162462.41 \\ \hline
\texttt{z} & 33 & 24 & 95747.74 \\ \hline \texttt{z} & 33 & 24 & 95747.74 \\ \hline
\end{tabular} \end{tabular}
\caption{Ingestion characteristics of various data sources.}
\label{tab:ingest_datasources}
\end{table} \end{table}
We can see that based on the descriptions in We can see that, based on the descriptions in
Table~\ref{tab:ingest_datasources}, latencies vary significantly and the Table~\ref{tab:ingest_datasources}, latencies vary significantly and the
ingestion latency is not always a factor of the number of dimensions and ingestion latency is not always a factor of the number of dimensions and
metrics. We see some lower latencies on simple data sets because that was the rate that the metrics. We see some lower latencies on simple data sets because that was the
data producer was delivering data. The results are shown in Figure~\ref{fig:ingestion_rate}. rate that the data producer was delivering data. The results are shown in
Figure~\ref{fig:ingestion_rate}.
We define throughput as the number of events a
real-time node can ingest and also make queryable. If too many events are sent
to the real-time node, those events are blocked until the real-time node has
capacity to accept them. The peak ingestion latency we measured in production
was 22914.43 events/second/core on a datasource with 30 dimensions and 19 metrics,
running an Amazon \texttt{cc2.8xlarge} instance.
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{ingestion_rate} \includegraphics[width = 2.8in]{ingestion_rate}
\caption{Druid production cluster ingestion rates for multiple data sources.} \caption{Combined cluster ingestion rates.}
\label{fig:ingestion_rate} \label{fig:ingestion_rate}
\end{figure} \end{figure}
The peak ingestion latency we measured in production was 23,000 events/sec/core The latency measurements we presented are sufficient to address the our stated
on an Amazon EC2 cc2.8xlarge. The data source had 30 dimensions and 19 metrics. problems of interactivity. We would prefer the variability in the latencies to
be less. It is still very possible to possible to decrease latencies by adding
additional hardware, but we have not chosen to do so because infrastructure
costs are still a consideration to us.
\section{Druid in Production} \section{Druid in Production}\label{sec:production}
\label{sec:production} Over the last few years, we have gained tremendous knowledge about handling
Over the last few years, we've gained tremendous production workloads with Druid and have made a couple of interesting observations.
knowledge about handling production workloads, setting up correct operational
monitoring, integrating Druid with other products as part of a more
sophisticated data analytics stack, and distributing data to handle entire data
center outages. One of the most important lessons we've learned is that no
amount of testing can accurately simulate a production environment, and failures
will occur for every imaginable and unimaginable reason. Interestingly, most of
our most severe crashes were due to misunderstanding the impacts a
seemingly small feature would have on the overall system.
Some of our more interesting observations include: \paragraph{Query Patterns}
\begin{itemize} Druid is often used to explore data and generate reports on data. In the
\item Druid is often used in production to power exploratory dashboards. Many explore use case, the number of queries issued by a single user is much higher
users of exploratory dashboards are not from technical backgrounds, and they than in the reporting use case. Exploratory queries often involve progressively
often issue queries without understanding the impacts to the underlying system. adding filters for the same time range to narrow down results. Users tend to
For example, some users become impatient that their queries for terabytes of explore short time intervals of recent data. In the generate report use case,
data do not return in milliseconds and continously refresh their dashboard users query for much longer data intervals, but users also already know the
view, generating heavy load to Druid. This type of usage forced Druid to defend queries they want to issue.
itself against expensive repetitive queries.
\item Cluster query performance benefits from multitenancy. Hosting every \paragraph{Multitenancy}
production datasource in the same cluster leads to better data parallelization Expensive concurrent queries can be problematic in a multitenant
as additional nodes are added. environment. Queries for large data sources may end up hitting every historical
node in a cluster and consume all cluster resources. Smaller, cheaper queries
may be blocked from executing in such cases. We introduced query prioritization
to address these issues. Each historical node is able to prioritize which
segments it needs to scan. Proper query planning is critical for production
workloads. Thankfully, queries for a significant amount of data tend to be for
reporting use cases and can be deprioritized. Users do not expect the same level of
interactivity in this use case as when they are exploring data.
\item Even if you provide users with the ability to arbitrarily explore data, \paragraph{Node failures}
they often only have a few questions in mind. Caching is extremely important in Single node failures are common in distributed environments, but many nodes
this case, and we see a very high cache hit rates. failing at once are not. If historical nodes completely fail and do not
recover, their segments need to reassigned, which means we need excess cluster
capacity to load this data. The amount of additional capacity to have at any
time contributes to the cost of running a cluster. From our experiences, it is
extremely rare to see more than 2 nodes completely fail at once and hence, we
leave enough capacity in our cluster to completely reassign the data from 2
historical nodes.
\item When using a memory mapped storage engine, even a small amount of paging \paragraph{Data Center Outages}
data from disk can severely impact query performance. SSDs greatly mitigate Complete cluster failures are possible, but extremely rare. If Druid is
this problem. only deployed in a single data center, it is possible for the entire data
center to fail. In such cases, new machines need to be provisioned. As long as
\item Leveraging approximate algorithms can greatly reduce data storage costs and deep storage is still available, cluster recovery time is network bound as
improve query performance. Many users do not care about exact answers to their historical nodes simply need to redownload every segment from deep storage. We
questions and are comfortable with a few percentage points of error. have experienced such failures in the past, and the recovery time was around
\end{itemize} several hours in the AWS ecosystem for several TBs of data.
\subsection{Operational Monitoring} \subsection{Operational Monitoring}
Proper monitoring is critical to run a large scale distributed cluster. Proper monitoring is critical to run a large scale distributed cluster.
@ -955,24 +988,22 @@ Each Druid node is designed to periodically emit a set of operational metrics.
These metrics may include system level data such as CPU usage, available These metrics may include system level data such as CPU usage, available
memory, and disk capacity, JVM statistics such as garbage collection time, and memory, and disk capacity, JVM statistics such as garbage collection time, and
heap usage, or node specific metrics such as segment scan time, cache heap usage, or node specific metrics such as segment scan time, cache
hit rates, and data ingestion latencies. For each query, Druid nodes can also hit rates, and data ingestion latencies. Druid also emits per query metrics.
emit metrics about the details of the query such as the number of filters
applied, or the interval of data requested.
Metrics can be emitted from a production Druid cluster into a dedicated metrics We emit metrics from a production Druid cluster and load them into a dedicated
Druid cluster. Queries can be made to the metrics Druid cluster to explore metrics Druid cluster. The metrics Druid cluster is used to explore the
production cluster performance and stability. Leveraging a dedicated metrics performance and stability of the production cluster. This dedicated metrics
cluster has allowed us to find numerous production problems, such as gradual cluster has allowed us to find numerous production problems, such as gradual
query speed degregations, less than optimally tuned hardware, and various other query speed degregations, less than optimally tuned hardware, and various other
system bottlenecks. We also use a metrics cluster to analyze what queries are system bottlenecks. We also use a metrics cluster to analyze what queries are
made in production. This analysis allows us to determine what our users are made in production and what users are most interested in.
most often doing and we use this information to drive our road map.
\subsection{Pairing Druid with a Stream Processor} \subsection{Pairing Druid with a Stream Processor}
At the time of writing, Druid can only understand fully denormalized data At the time of writing, Druid can only understand fully denormalized data
streams. In order to provide full business logic in production, Druid can be streams. In order to provide full business logic in production, Druid can be
paired with a stream processor such as Apache Storm \cite{marz2013storm}. A paired with a stream processor such as Apache Storm \cite{marz2013storm}.
Storm topology consumes events from a data stream, retains only those that are
A Storm topology consumes events from a data stream, retains only those that are
“on-time”, and applies any relevant business logic. This could range from “on-time”, and applies any relevant business logic. This could range from
simple transformations, such as id to name lookups, up to complex operations simple transformations, such as id to name lookups, up to complex operations
such as multi-stream joins. The Storm topology forwards the processed event such as multi-stream joins. The Storm topology forwards the processed event
@ -1039,13 +1070,12 @@ of functionality as Druid, some of Druids optimization techniques such as usi
inverted indices to perform fast filters are also used in other data inverted indices to perform fast filters are also used in other data
stores \cite{macnicol2004sybase}. stores \cite{macnicol2004sybase}.
\newpage
\section{Conclusions} \section{Conclusions}
\label{sec:conclusions} \label{sec:conclusions}
In this paper, we presented Druid, a distributed, column-oriented, real-time In this paper, we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications analytical data store. Druid is designed to power high performance applications
and is optimized for low query latencies. Druid supports streaming data and is optimized for low query latencies. Druid supports streaming data
ingestion and is fault-tolerant. We discussed how Druid benchmarks and ingestion and is fault-tolerant. We discussed Druid benchmarks and
summarized key architecture aspects such summarized key architecture aspects such
as the storage format, query language, and general execution. as the storage format, query language, and general execution.

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -212,12 +212,12 @@ unaryExpression returns [PostAggregator p]
if($e.p instanceof ConstantPostAggregator) { if($e.p instanceof ConstantPostAggregator) {
ConstantPostAggregator c = (ConstantPostAggregator)$e.p; ConstantPostAggregator c = (ConstantPostAggregator)$e.p;
double v = c.getConstantValue().doubleValue() * -1; double v = c.getConstantValue().doubleValue() * -1;
$p = new ConstantPostAggregator(Double.toString(v), v); $p = new ConstantPostAggregator(Double.toString(v), v, null);
} else { } else {
$p = new ArithmeticPostAggregator( $p = new ArithmeticPostAggregator(
"-"+$e.p.getName(), "-"+$e.p.getName(),
"*", "*",
Lists.newArrayList($e.p, new ConstantPostAggregator("-1", -1.0)) Lists.newArrayList($e.p, new ConstantPostAggregator("-1", -1.0, null))
); );
} }
} }
@ -240,7 +240,7 @@ aggregate returns [AggregatorFactory agg]
; ;
constant returns [ConstantPostAggregator c] constant returns [ConstantPostAggregator c]
: value=NUMBER { double v = Double.parseDouble($value.text); $c = new ConstantPostAggregator(Double.toString(v), v); } : value=NUMBER { double v = Double.parseDouble($value.text); $c = new ConstantPostAggregator(Double.toString(v), v, null); }
; ;
/* time filters must be top level filters */ /* time filters must be top level filters */

View File

@ -22,9 +22,11 @@ package io.druid.client.cache;
import java.util.AbstractQueue; import java.util.AbstractQueue;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -36,17 +38,18 @@ import java.util.concurrent.locks.ReentrantLock;
*/ */
public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
{ {
private final LinkedList<E> delegate; private final Queue<E> delegate;
private final AtomicLong currentSize = new AtomicLong(0); private final AtomicLong currentSize = new AtomicLong(0);
private final Lock putLock = new ReentrantLock(); private final Lock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition(); private final Condition notFull = putLock.newCondition();
private final Lock takeLock = new ReentrantLock(); private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition(); private final Condition notEmpty = takeLock.newCondition();
private final AtomicInteger elementCount = new AtomicInteger(0);
private long capacity; private long capacity;
public BytesBoundedLinkedQueue(long capacity) public BytesBoundedLinkedQueue(long capacity)
{ {
delegate = new LinkedList<>(); delegate = new ConcurrentLinkedQueue<>();
this.capacity = capacity; this.capacity = capacity;
} }
@ -71,11 +74,13 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
public void elementAdded(E e) public void elementAdded(E e)
{ {
currentSize.addAndGet(getBytesSize(e)); currentSize.addAndGet(getBytesSize(e));
elementCount.getAndIncrement();
} }
public void elementRemoved(E e) public void elementRemoved(E e)
{ {
currentSize.addAndGet(-1 * getBytesSize(e)); currentSize.addAndGet(-1 * getBytesSize(e));
elementCount.getAndDecrement();
} }
private void fullyUnlock() private void fullyUnlock()
@ -115,7 +120,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
@Override @Override
public int size() public int size()
{ {
return delegate.size(); return elementCount.get();
} }
@Override @Override
@ -163,7 +168,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
E e; E e;
takeLock.lockInterruptibly(); takeLock.lockInterruptibly();
try { try {
while (delegate.size() == 0) { while (elementCount.get() == 0) {
notEmpty.await(); notEmpty.await();
} }
e = delegate.remove(); e = delegate.remove();
@ -181,8 +186,16 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
@Override @Override
public int remainingCapacity() public int remainingCapacity()
{ {
int delegateSize = delegate.size(); int delegateSize;
long currentByteSize = currentSize.get(); long currentByteSize;
fullyLock();
try {
delegateSize = elementCount.get();
currentByteSize = currentSize.get();
}
finally {
fullyUnlock();
}
// return approximate remaining capacity based on current data // return approximate remaining capacity based on current data
if (delegateSize == 0) { if (delegateSize == 0) {
return (int) Math.min(capacity, Integer.MAX_VALUE); return (int) Math.min(capacity, Integer.MAX_VALUE);
@ -214,13 +227,13 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
int n = 0; int n = 0;
takeLock.lock(); takeLock.lock();
try { try {
n = Math.min(maxElements, delegate.size()); // elementCount.get provides visibility to first n Nodes
n = Math.min(maxElements, elementCount.get());
if (n < 0) { if (n < 0) {
return 0; return 0;
} }
// count.get provides visibility to first n Nodes
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
E e = delegate.remove(0); E e = delegate.remove();
elementRemoved(e); elementRemoved(e);
c.add(e); c.add(e);
} }
@ -287,7 +300,7 @@ public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implem
E e = null; E e = null;
takeLock.lockInterruptibly(); takeLock.lockInterruptibly();
try { try {
while (delegate.size() == 0) { while (elementCount.get() == 0) {
if (nanos <= 0) { if (nanos <= 0) {
return null; return null;
} }

View File

@ -29,10 +29,12 @@ public class LocalCacheProvider extends CacheConfig implements CacheProvider
{ {
@JsonProperty @JsonProperty
@Min(0) @Min(0)
private long sizeInBytes = 0; private long sizeInBytes = 10485760;
@JsonProperty @JsonProperty
@Min(0) @Min(0)
private int initialSize = 500000; private int initialSize = 500000;
@JsonProperty @JsonProperty
@Min(0) @Min(0)
private int logEvictionCount = 0; private int logEvictionCount = 0;

View File

@ -32,6 +32,8 @@ import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.ops.LinkedOperationQueueFactory;
import net.spy.memcached.ops.OperationQueueFactory;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -56,7 +58,15 @@ public class MemcachedCache implements Cache
// always use compression // always use compression
transcoder.setCompressionThreshold(0); transcoder.setCompressionThreshold(0);
MemcachedOperationQueueFactory queueFactory = new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize());
OperationQueueFactory opQueueFactory;
long maxQueueBytes = config.getMaxOperationQueueSize();
if(maxQueueBytes > 0) {
opQueueFactory = new MemcachedOperationQueueFactory(maxQueueBytes);
} else {
opQueueFactory = new LinkedOperationQueueFactory();
}
return new MemcachedCache( return new MemcachedCache(
new MemcachedClient( new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
@ -68,7 +78,8 @@ public class MemcachedCache implements Cache
.setShouldOptimize(true) .setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout()) .setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout()) .setOpTimeout(config.getTimeout())
.setOpQueueFactory(queueFactory) .setReadBufferSize(config.getReadBufferSize())
.setOpQueueFactory(opQueueFactory)
.build(), .build(),
AddrUtil.getAddresses(config.getHosts()) AddrUtil.getAddresses(config.getHosts())
), ),

View File

@ -20,24 +20,38 @@
package io.druid.client.cache; package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import net.spy.memcached.DefaultConnectionFactory;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
public class MemcachedCacheConfig extends CacheConfig public class MemcachedCacheConfig extends CacheConfig
{ {
// default to 30 day expiration for cache entries
// values greater than 30 days are interpreted by memcached as absolute POSIX timestamps instead of duration
@JsonProperty @JsonProperty
private int expiration = 2592000; // What is this number? private int expiration = 30 * 24 * 3600;
@JsonProperty @JsonProperty
private int timeout = 500; private int timeout = 500;
// comma delimited list of memcached servers, given as host:port combination
@JsonProperty @JsonProperty
@NotNull @NotNull
private String hosts; private String hosts;
@JsonProperty @JsonProperty
private int maxObjectSize = 50 * 1024 * 1024; private int maxObjectSize = 50 * 1024 * 1024;
// memcached client read buffer size, -1 uses the spymemcached library default
@JsonProperty
private int readBufferSize = DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE;
@JsonProperty @JsonProperty
private String memcachedPrefix = "druid"; private String memcachedPrefix = "druid";
// maximum size in bytes of memcached client operation queue. 0 means unbounded
@JsonProperty @JsonProperty
private long maxOperationQueueSize = 256 * 1024 * 1024L; // 256 MB private long maxOperationQueueSize = 0;
public int getExpiration() public int getExpiration()
{ {
@ -68,4 +82,9 @@ public class MemcachedCacheConfig extends CacheConfig
{ {
return maxOperationQueueSize; return maxOperationQueueSize;
} }
public int getReadBufferSize()
{
return readBufferSize;
}
} }

View File

@ -0,0 +1,46 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
*/
public class CustomVersioningPolicy implements VersioningPolicy
{
private final String version;
@JsonCreator
public CustomVersioningPolicy(
@JsonProperty("version") String version
)
{
this.version = version == null ? new DateTime().toString() : version;
}
@Override
public String getVersion(Interval interval)
{
return version;
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.plumber; package io.druid.segment.realtime.plumber;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;

View File

@ -578,9 +578,12 @@ public class RealtimePlumber implements Plumber
log.info("Starting merge and push."); log.info("Starting merge and push.");
long minTimestamp = segmentGranularity.truncate( DateTime minTimestampAsDate = segmentGranularity.truncate(
rejectionPolicy.getCurrMaxTime().minus(windowMillis) rejectionPolicy.getCurrMaxTime().minus(windowMillis)
).getMillis(); );
long minTimestamp = minTimestampAsDate.getMillis();
log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate);
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList(); List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) { for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
@ -588,9 +591,13 @@ public class RealtimePlumber implements Plumber
if (intervalStart < minTimestamp) { if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry); log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry); sinksToPush.add(entry);
} else {
log.warn("[%s] < [%s] Skipping persist and merge.", new DateTime(intervalStart), minTimestampAsDate);
} }
} }
log.info("Found [%,d] sinks to persist and merge", sinksToPush.size());
for (final Map.Entry<Long, Sink> entry : sinksToPush) { for (final Map.Entry<Long, Sink> entry : sinksToPush) {
persistAndMerge(entry.getKey(), entry.getValue()); persistAndMerge(entry.getKey(), entry.getValue());
} }

View File

@ -27,7 +27,8 @@ import org.joda.time.Period;
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class), @JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class), @JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class) @JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "test", value = TestRejectionPolicyFactory.class)
}) })
public interface RejectionPolicyFactory public interface RejectionPolicyFactory
{ {

View File

@ -0,0 +1,49 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.plumber;
import org.joda.time.DateTime;
import org.joda.time.Period;
/**
*/
public class TestRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(Period windowPeriod)
{
return new RejectionPolicy()
{
private final DateTime max = new DateTime(Long.MAX_VALUE);
@Override
public DateTime getCurrMaxTime()
{
return max;
}
@Override
public boolean accept(long timestamp)
{
return true;
}
};
}
}

View File

@ -25,7 +25,9 @@ import org.joda.time.Interval;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class) @JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class),
@JsonSubTypes.Type(name = "custom", value = CustomVersioningPolicy.class)
}) })
public interface VersioningPolicy public interface VersioningPolicy
{ {

View File

@ -35,7 +35,9 @@ import org.joda.time.DateTime;
@JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class), @JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class),
@JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class), @JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class),
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class), @JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class) @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
@JsonSubTypes.Type(name = "dropForever", value = ForeverDropRule.class)
}) })
public interface Rule public interface Rule
{ {

View File

@ -126,7 +126,7 @@ public class DatasourcesResource
@GET @GET
@Path("/{dataSourceName}") @Path("/{dataSourceName}")
@Consumes("application/json") @Produces("application/json")
public Response getTheDataSource( public Response getTheDataSource(
@PathParam("dataSourceName") final String dataSourceName @PathParam("dataSourceName") final String dataSourceName
) )

View File

@ -461,6 +461,22 @@ public class InfoResource
).build(); ).build();
} }
@GET
@Path("/datasources/{dataSourceName}")
@Produces("application/json")
public Response getTheDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.ok(dataSource).build();
}
@DELETE @DELETE
@Path("/datasources/{dataSourceName}") @Path("/datasources/{dataSourceName}")
public Response deleteDataSource( public Response deleteDataSource(

View File

@ -24,6 +24,8 @@ import com.metamx.common.ISE;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -33,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public class BytesBoundedLinkedQueueTest public class BytesBoundedLinkedQueueTest
@ -171,11 +174,86 @@ public class BytesBoundedLinkedQueueTest
} }
} }
@Test public void testAddedObjectExceedsCapacity() throws Exception { @Test
public void testAddedObjectExceedsCapacity() throws Exception
{
BlockingQueue<TestObject> q = getQueue(4); BlockingQueue<TestObject> q = getQueue(4);
Assert.assertTrue(q.offer(new TestObject(3))); Assert.assertTrue(q.offer(new TestObject(3)));
Assert.assertFalse(q.offer(new TestObject(2))); Assert.assertFalse(q.offer(new TestObject(2)));
Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS)); Assert.assertFalse(q.offer(new TestObject(2), delayMS, TimeUnit.MILLISECONDS));
}
// @Test
public void testConcurrentOperations() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(Integer.MAX_VALUE);
long duration = TimeUnit.SECONDS.toMillis(10);
ExecutorService executor = Executors.newCachedThreadPool();
final AtomicBoolean stopTest = new AtomicBoolean(false);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
while (!stopTest.get()) {
q.add(new TestObject(1));
q.add(new TestObject(2));
}
return true;
}
}
)
);
}
for (int i = 0; i < 10; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws InterruptedException
{
while (!stopTest.get()) {
q.poll(100,TimeUnit.MILLISECONDS);
q.offer(new TestObject(2));
}
return true;
}
}
)
);
}
for (int i = 0; i < 5; i++) {
futures.add(
executor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
while (!stopTest.get()) {
System.out
.println("drained elements : " + q.drainTo(new ArrayList<TestObject>(), Integer.MAX_VALUE));
}
return true;
}
}
)
);
}
Thread.sleep(duration);
stopTest.set(true);
for (Future<Boolean> future : futures) {
Assert.assertTrue(future.get());
}
} }
public static class TestObject public static class TestObject

View File

@ -27,7 +27,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.64-SNAPSHOT</version> <version>0.6.73-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -54,7 +54,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "broker", name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.63/Broker.html for a description" description = "Runs a broker node, see http://druid.io/docs/0.6.72/Broker.html for a description"
) )
public class CliBroker extends ServerRunnable public class CliBroker extends ServerRunnable
{ {

View File

@ -66,7 +66,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "coordinator", name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.63/Coordinator.html for a description." description = "Runs the Coordinator, see http://druid.io/docs/0.6.72/Coordinator.html for a description."
) )
public class CliCoordinator extends ServerRunnable public class CliCoordinator extends ServerRunnable
{ {

View File

@ -41,7 +41,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "hadoop", name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.63/Batch-ingestion.html for a description." description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.72/Batch-ingestion.html for a description."
) )
public class CliHadoopIndexer implements Runnable public class CliHadoopIndexer implements Runnable
{ {

View File

@ -45,7 +45,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "historical", name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.63/Historical.html for a description" description = "Runs a Historical node, see http://druid.io/docs/0.6.72/Historical.html for a description"
) )
public class CliHistorical extends ServerRunnable public class CliHistorical extends ServerRunnable
{ {

View File

@ -20,14 +20,20 @@
package io.druid.cli; package io.druid.cli;
import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.api.client.util.Lists;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments; import io.airlift.command.Arguments;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder; import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob; import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.JobHelper;
import io.druid.indexer.Jobby;
import java.io.File; import java.io.File;
import java.util.ArrayList;
import java.util.List;
/** /**
*/ */
@ -37,17 +43,20 @@ import java.io.File;
) )
public class CliInternalHadoopIndexer implements Runnable public class CliInternalHadoopIndexer implements Runnable
{ {
private static final Logger log = new Logger(CliHadoopIndexer.class);
@Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true) @Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true)
private String argumentSpec; private String argumentSpec;
private static final Logger log = new Logger(CliHadoopIndexer.class);
@Override @Override
public void run() public void run()
{ {
try { try {
final HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(getHadoopDruidIndexerConfig()); HadoopDruidIndexerConfig config = getHadoopDruidIndexerConfig();
job.run(); List<Jobby> jobs = Lists.newArrayList();
jobs.add(new HadoopDruidDetermineConfigurationJob(config));
jobs.add(new HadoopDruidIndexerJob(config));
JobHelper.runJobs(jobs, config);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -93,7 +93,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "overlord", name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.63/Indexing-Service.html for a description" description = "Runs an Overlord node, see http://druid.io/docs/0.6.72/Indexing-Service.html for a description"
) )
public class CliOverlord extends ServerRunnable public class CliOverlord extends ServerRunnable
{ {

View File

@ -30,7 +30,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "realtime", name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.63/Realtime.html for a description" description = "Runs a realtime node, see http://druid.io/docs/0.6.72/Realtime.html for a description"
) )
public class CliRealtime extends ServerRunnable public class CliRealtime extends ServerRunnable
{ {

View File

@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
*/ */
@Command( @Command(
name = "realtime", name = "realtime",
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.63/Realtime.html for a description" description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.72/Realtime.html for a description"
) )
public class CliRealtimeExample extends ServerRunnable public class CliRealtimeExample extends ServerRunnable
{ {