diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index 5682138b49d..57150629f14 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index da437e99e1f..b4bbaf0f4f6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/docs/content/Aggregations.md b/docs/content/Aggregations.md
index 29740a2858c..c8c79c6acf0 100644
--- a/docs/content/Aggregations.md
+++ b/docs/content/Aggregations.md
@@ -159,4 +159,29 @@ Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to
```json
{ "type" : "hyperUnique", "name" : , "fieldName" : }
-```
\ No newline at end of file
+```
+
+## Miscellaneous Aggregations
+
+### Filtered Aggregator
+
+A filtered aggregator wraps any given aggregator, but only aggregates the values for which the given dimension filter matches.
+
+This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations.
+
+*Limitations:* The filtered aggregator currently only supports selector and not filter with a single selector, i.e. matching a dimension against a single value.
+
+*Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data.
+
+```json
+{
+ "type" : "filtered",
+ "name" : "aggMatching",
+ "filter" : {
+ "type" : "selector",
+ "dimension" : ,
+ "value" :
+ }
+ "aggregator" :
+}
+```
diff --git a/docs/content/Batch-ingestion.md b/docs/content/Batch-ingestion.md
index 2f53eb48b4e..0326d1e1a66 100644
--- a/docs/content/Batch-ingestion.md
+++ b/docs/content/Batch-ingestion.md
@@ -162,37 +162,58 @@ The indexing process has the ability to roll data up as it processes the incomin
### Partitioning specification
-Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way depending on partition type.
-Druid supports two types of partitions spec - singleDimension and hashed.
+Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in
+some other way depending on partition type. Druid supports two types of partitioning strategies: "hashed" (based on the
+hash of all dimensions in each row), and "dimension" (based on ranges of a single dimension).
-In SingleDimension partition type data is partitioned based on the values in that dimension.
-For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z.
+Hashed partitioning is recommended in most cases, as it will improve indexing performance and create more uniformly
+sized data segments relative to single-dimension partitioning.
-In hashed partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
-
-It is recommended to use Hashed partition as it is more efficient than singleDimension since it does not need to determine the dimension for creating partitions.
-Hashing also gives better distribution of data resulting in equal sized partitions and improving query performance
-
-To use this druid to automatically determine optimal partitions indexer must be given a target partition size. It can then find a good set of partition ranges on its own.
-
-#### Configuration for disabling auto-sharding and creating Fixed number of partitions
- Druid can be configured to NOT run determine partitions and create a fixed number of shards by specifying numShards in hashed partitionsSpec.
- e.g This configuration will skip determining optimal partitions and always create 4 shards for every segment granular interval
+#### Hash-based partitioning
```json
"partitionsSpec": {
- "type": "hashed"
- "numShards": 4
+ "type": "hashed",
+ "targetPartitionSize": 5000000
}
```
+Hashed partitioning works by first selecting a number of segments, and then partitioning rows across those segments
+according to the hash of all dimensions in each row. The number of segments is determined automatically based on the
+cardinality of the input set and a target partition size.
+
+The configuration options are:
+
|property|description|required?|
|--------|-----------|---------|
-|type|type of partitionSpec to be used |no, default : singleDimension|
-|targetPartitionSize|target number of rows to include in a partition, should be a number that targets segments of 700MB\~1GB.|yes|
+|type|type of partitionSpec to be used |"hashed"|
+|targetPartitionSize|target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards|
+|numShards|specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize|
+
+#### Single-dimension partitioning
+
+```json
+ "partitionsSpec": {
+ "type": "dimension",
+ "targetPartitionSize": 5000000
+ }
+```
+
+Single-dimension partitioning works by first selecting a dimension to partition on, and then separating that dimension
+into contiguous ranges. Each segment will contain all rows with values of that dimension in that range. For example,
+your segments may be partitioned on the dimension "host" using the ranges "a.example.com" to "f.example.com" and
+"f.example.com" to "z.example.com". By default, the dimension to use is determined automatically, although you can
+override it with a specific dimension.
+
+The configuration options are:
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|type of partitionSpec to be used |"dimension"|
+|targetPartitionSize|target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes|
+|maxPartitionSize|maximum number of rows to include in a partition. Defaults to 50% larger than the targetPartitionSize.|no|
|partitionDimension|the dimension to partition on. Leave blank to select a dimension automatically.|no|
-|assumeGrouped|assume input data has already been grouped on time and dimensions. This is faster, but can choose suboptimal partitions if the assumption is violated.|no|
-|numShards|provides a way to manually override druid-auto sharding and specify the number of shards to create for each segment granular interval.It is only supported by hashed partitionSpec and targetPartitionSize must be set to -1|no|
+|assumeGrouped|assume input data has already been grouped on time and dimensions. Ingestion will run faster, but can choose suboptimal partitions if the assumption is violated.|no|
### Updater job spec
diff --git a/docs/content/Best-Practices.md b/docs/content/Best-Practices.md
deleted file mode 100644
index f4cd8f32595..00000000000
--- a/docs/content/Best-Practices.md
+++ /dev/null
@@ -1,23 +0,0 @@
----
-layout: doc_page
----
-
-Best Practices
-==============
-
-# Use UTC Timezone
-
-We recommend using UTC timezone for all your events and across on your nodes, not just for Druid, but for all data infrastructure. This can greatly mitigate potential query problems with inconsistent timezones.
-
-# Use Lowercase Strings for Column Names
-
-Druid is not perfect in how it handles mix-cased dimension and metric names. This will hopefully change very soon but for the time being, lower casing your column names is recommended.
-
-# SSDs
-
-SSDs are highly recommended for historical and real-time nodes if you are not running a cluster that is entirely in memory. SSDs can greatly mitigate the time required to page data in and out of memory.
-
-# Provide Columns Names in Lexicographic Order for Best Results
-
-Although Druid supports schemaless ingestion of dimensions, because of https://github.com/metamx/druid/issues/658, you may sometimes get bigger segments than necessary. To ensure segments are as compact as possible, providing dimension names in lexicographic order is recommended. This may require some ETL processing on your data however.
-
diff --git a/docs/content/Coordinator.md b/docs/content/Coordinator.md
index 9021cbe1dff..42d1d17041d 100644
--- a/docs/content/Coordinator.md
+++ b/docs/content/Coordinator.md
@@ -20,9 +20,7 @@ io.druid.cli.Main server coordinator
Rules
-----
-Segments are loaded and dropped from the cluster based on a set of rules. Rules indicate how segments should be assigned to different historical node tiers and how many replicants of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. The coordinator loads a set of rules from the database. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule.
-
-For more information on rules, see [Rule Configuration](Rule-Configuration.html).
+Segments can be automatically loaded and dropped from the cluster based on a set of rules. For more information on rules, see [Rule Configuration](Rule-Configuration.html).
Cleaning Up Segments
--------------------
diff --git a/docs/content/Examples.md b/docs/content/Examples.md
index dfd44ffe927..45c5d573892 100644
--- a/docs/content/Examples.md
+++ b/docs/content/Examples.md
@@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
-git checkout druid-0.6.159
+git checkout druid-0.6.160
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
-[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz) a stand-alone tarball and run it:
+[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz
diff --git a/docs/content/Granularities.md b/docs/content/Granularities.md
index d4f3ea73141..e8a3e4fc2bf 100644
--- a/docs/content/Granularities.md
+++ b/docs/content/Granularities.md
@@ -21,13 +21,13 @@ Duration granularities are specified as an exact duration in milliseconds and ti
They also support specifying an optional origin, which defines where to start counting time buckets from (defaults to 1970-01-01T00:00:00Z).
-```
+```javascript
{"type": "duration", "duration": "7200000"}
```
This chunks up every 2 hours.
-```
+```javascript
{"type": "duration", "duration": "3600000", "origin": "2012-01-01T00:30:00Z"}
```
@@ -39,13 +39,13 @@ Period granularities are specified as arbitrary period combinations of years, mo
Time zone is optional (defaults to UTC). Origin is optional (defaults to 1970-01-01T00:00:00 in the given time zone).
-```
+```javascript
{"type": "period", "period": "P2D", "timeZone": "America/Los_Angeles"}
```
This will bucket by two-day chunks in the Pacific timezone.
-```
+```javascript
{"type": "period", "period": "P3M", "timeZone": "America/Los_Angeles", "origin": "2012-02-01T00:00:00-08:00"}
```
diff --git a/docs/content/Ingestion-FAQ.md b/docs/content/Ingestion-FAQ.md
index fc83907c47a..972e62a6a23 100644
--- a/docs/content/Ingestion-FAQ.md
+++ b/docs/content/Ingestion-FAQ.md
@@ -1,6 +1,11 @@
---
layout: doc_page
---
+
+## What types of data does Druid support?
+
+Druid can ingest JSON, CSV, TSV and other delimited data out of the box. Druid supports single dimension values, or multiple dimension values (an array of strings). Druid supports long and float numeric columns.
+
## Where do my Druid segments end up after ingestion?
Depending on what `druid.storage.type` is set to, Druid will upload segments to some [Deep Storage](Deep-Storage.html). Local disk is used as the default deep storage.
@@ -21,6 +26,14 @@ druid.storage.bucket=druid
druid.storage.baseKey=sample
```
+Other common reasons that hand-off fails are as follows:
+
+1) Historical nodes are out of capacity and cannot download any more segments. You'll see exceptions in the coordinator logs if this occurs.
+
+2) Segments are corrupt and cannot download. You'll see exceptions in your historical nodes if this occurs.
+
+3) Deep storage is improperly configured. Make sure that your segment actually exists in deep storage and that the coordinator logs have no errors.
+
## How do I get HDFS to work?
Make sure to include the `druid-hdfs-storage` module as one of your extensions and set `druid.storage.type=hdfs`.
@@ -35,7 +48,7 @@ You can check the coordinator console located at `:/cluste
## My queries are returning empty results
-You can check `:/druid/v2/datasources/?interval=0/3000` for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists. Note: the broker endpoint will only return valid results on historical segments.
+You can check `:/druid/v2/datasources/?interval=0/3000` for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists. Note: the broker endpoint will only return valid results on historical segments and not segments served by real-time nodes.
## How can I Reindex existing data in Druid with schema changes?
@@ -50,6 +63,9 @@ To do this use the IngestSegmentFirehose and run an indexer task. The IngestSegm
Typically the above will be run as a batch job to say everyday feed in a chunk of data and aggregate it.
+## Real-time ingestion seems to be stuck
+
+There are a few ways this can occur. Druid will throttle ingestion to prevent out of memory problems if the intermediate persists are taking too long or if hand-off is taking too long. If your node logs indicate certain columns are taking a very long time to build (for example, if your segment granularity is hourly, but creating a single column takes 30 minutes), you should re-evaluate your configuration or scale up your real-time ingestion.
## More information
diff --git a/docs/content/Kafka-Eight.md b/docs/content/Kafka-Eight.md
index 5819a931b2a..175c71a65c6 100644
--- a/docs/content/Kafka-Eight.md
+++ b/docs/content/Kafka-Eight.md
@@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
- Update realtime node's configs for Kafka 8 extensions
- e.g.
- - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.159",...]`
+ - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.160",...]`
- becomes
- - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.159",...]`
+ - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.160",...]`
- Update realtime task config for changed keys
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
diff --git a/docs/content/Production-Cluster-Configuration.md b/docs/content/Production-Cluster-Configuration.md
index 6fc319712de..21190792663 100644
--- a/docs/content/Production-Cluster-Configuration.md
+++ b/docs/content/Production-Cluster-Configuration.md
@@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/overlord
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/middlemanager
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160","io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/historical
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
diff --git a/docs/content/Querying.md b/docs/content/Querying.md
index 895a1cf2239..dd556b2740f 100644
--- a/docs/content/Querying.md
+++ b/docs/content/Querying.md
@@ -66,7 +66,7 @@ The dataSource JSON field shown next identifies where to apply the query. In thi
"dataSource": "randSeq",
```
-The granularity JSON field specifies the bucket size for values. It could be a built-in time interval like "second", "minute", "fifteen_minute", "thirty_minute", "hour" or "day". It can also be an expression like `{"type": "period", "period":"PT6m"}` meaning "6 minute buckets". See [Granularities](Granularities.html) for more information on the different options for this field. In this example, it is set to the special value "all" which means [bucket all data points together into the same time bucket]()
+The granularity JSON field specifies the bucket size for values. It could be a built-in time interval like "second", "minute", "fifteen_minute", "thirty_minute", "hour" or "day". It can also be an expression like `{"type": "period", "period":"PT6m"}` meaning "6 minute buckets". See [Granularities](Granularities.html) for more information on the different options for this field. In this example, it is set to the special value "all" which means bucket all data points together into the same time bucket.
```javascript
"granularity": "all",
@@ -133,7 +133,7 @@ Properties shared by all query types
|search|searchDimensions|Dimensions to apply the search query to. If not specified, it will search through all dimensions.|no|
|search|query|The query portion of the search query. This is essentially a predicate that specifies if something matches.|yes|
-Query Context
+Query Context
-------------
|property |default | description |
diff --git a/docs/content/Realtime-Config.md b/docs/content/Realtime-Config.md
index 12ab55ff8e3..92ab9382f81 100644
--- a/docs/content/Realtime-Config.md
+++ b/docs/content/Realtime-Config.md
@@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.zk.service.host=localhost
@@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160","io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
diff --git a/docs/content/Recommendations.md b/docs/content/Recommendations.md
new file mode 100644
index 00000000000..bf764ffe6c2
--- /dev/null
+++ b/docs/content/Recommendations.md
@@ -0,0 +1,35 @@
+---
+layout: doc_page
+---
+
+Recommendations
+===============
+
+# Use UTC Timezone
+
+We recommend using UTC timezone for all your events and across on your nodes, not just for Druid, but for all data infrastructure. This can greatly mitigate potential query problems with inconsistent timezones.
+
+# Use Lowercase Strings for Column Names
+
+Druid is not perfect in how it handles mix-cased dimension and metric names. This will hopefully change very soon but for the time being, lower casing your column names is recommended.
+
+# SSDs
+
+SSDs are highly recommended for historical and real-time nodes if you are not running a cluster that is entirely in memory. SSDs can greatly mitigate the time required to page data in and out of memory.
+
+# Provide Columns Names in Lexicographic Order
+
+Although Druid supports schema-less ingestion of dimensions, because of [https://github.com/metamx/druid/issues/658](https://github.com/metamx/druid/issues/658), you may sometimes get bigger segments than necessary. To ensure segments are as compact as possible, providing dimension names in lexicographic order is recommended.
+
+
+# Use Timeseries and TopN Queries Instead of GroupBy Where Possible
+
+Timeseries and TopN queries are much more optimized and significantly faster than groupBy queries for their designed use cases. Issuing multiple topN or timeseries queries from your application can potentially be more efficient than a single groupBy query.
+
+# Read FAQs
+
+You should read common problems people have here:
+
+1) [Ingestion-FAQ](Ingestion-FAQ.html)
+
+2) [Performance-FAQ](Performance-FAQ.html)
\ No newline at end of file
diff --git a/docs/content/Router.md b/docs/content/Router.md
index 73849164216..064bf1d646d 100644
--- a/docs/content/Router.md
+++ b/docs/content/Router.md
@@ -51,7 +51,7 @@ druid.service=druid/prod/router
druid.extensions.remoteRepositories=[]
druid.extensions.localRepository=lib
-druid.extensions.coordinates=["io.druid.extensions:druid-histogram:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-histogram:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@@ -118,3 +118,16 @@ Including this strategy means all timeBoundary queries are always routed to the
```
Queries with a priority set to less than minPriority are routed to the lowest priority broker. Queries with priority set to greater than maxPriority are routed to the highest priority broker. By default, minPriority is 0 and maxPriority is 1. Using these default values, if a query with priority 0 (the default query priority is 0) is sent, the query skips the priority selection logic.
+
+### javascript
+
+Allows defining arbitrary routing rules using a JavaScript function. The function is passed the configuration and the query to be executed, and returns the tier it should be routed to, or null for the default tier.
+
+*Example*: a function that return the highest priority broker unless the given query has more than two aggregators.
+
+```json
+{
+ "type" : "javascript",
+ "function" : "function (config, query) { if (config.getTierToBrokerMap().values().size() > 0 && query.getAggregatorSpecs && query.getAggregatorSpecs().size() <= 2) { return config.getTierToBrokerMap().values().toArray()[0] } else { return config.getDefaultBrokerServiceName() } }"
+}
+```
diff --git a/docs/content/Rule-Configuration.md b/docs/content/Rule-Configuration.md
index bf8b8a9792d..c25c9e62b70 100644
--- a/docs/content/Rule-Configuration.md
+++ b/docs/content/Rule-Configuration.md
@@ -2,12 +2,34 @@
layout: doc_page
---
# Configuring Rules for Coordinator Nodes
+
+Rules indicate how segments should be assigned to different historical node tiers and how many replicas of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. The coordinator loads a set of rules from the metadata storage. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule.
+
Note: It is recommended that the coordinator console is used to configure rules. However, the coordinator node does have HTTP endpoints to programmatically configure rules.
+
Load Rules
----------
-Load rules indicate how many replicants of a segment should exist in a server tier.
+Load rules indicate how many replicas of a segment should exist in a server tier.
+
+### Forever Load Rule
+
+Forever load rules are of the form:
+
+```json
+{
+ "type" : "loadForever",
+ "tieredReplicants": {
+ "hot": 1,
+ "_default_tier" : 1
+ }
+}
+```
+
+* `type` - this should always be "loadByInterval"
+* `tieredReplicants` - A JSON Object where the keys are the tier names and values are the number of replicas for that tier.
+
### Interval Load Rule
@@ -16,14 +38,17 @@ Interval load rules are of the form:
```json
{
"type" : "loadByInterval",
- "interval" : "2012-01-01/2013-01-01",
- "tier" : "hot"
+ "interval": "2012-01-01/2013-01-01",
+ "tieredReplicants": {
+ "hot": 1,
+ "_default_tier" : 1
+ }
}
```
* `type` - this should always be "loadByInterval"
* `interval` - A JSON Object representing ISO-8601 Intervals
-* `tier` - the configured historical node tier
+* `tieredReplicants` - A JSON Object where the keys are the tier names and values are the number of replicas for that tier.
### Period Load Rule
@@ -33,13 +58,16 @@ Period load rules are of the form:
{
"type" : "loadByPeriod",
"period" : "P1M",
- "tier" : "hot"
+ "tieredReplicants": {
+ "hot": 1,
+ "_default_tier" : 1
+ }
}
```
* `type` - this should always be "loadByPeriod"
* `period` - A JSON Object representing ISO-8601 Periods
-* `tier` - the configured historical node tier
+* `tieredReplicants` - A JSON Object where the keys are the tier names and values are the number of replicas for that tier.
The interval of a segment will be compared against the specified period. The rule matches if the period overlaps the interval.
@@ -48,6 +76,21 @@ Drop Rules
Drop rules indicate when segments should be dropped from the cluster.
+### Forever Drop Rule
+
+Forever drop rules are of the form:
+
+```json
+{
+ "type" : "dropForever"
+}
+```
+
+* `type` - this should always be "dropByPeriod"
+
+All segments that match this rule are dropped from the cluster.
+
+
### Interval Drop Rule
Interval drop rules are of the form:
diff --git a/docs/content/Simple-Cluster-Configuration.md b/docs/content/Simple-Cluster-Configuration.md
index 8bfade9ab9a..c25f349f0b0 100644
--- a/docs/content/Simple-Cluster-Configuration.md
+++ b/docs/content/Simple-Cluster-Configuration.md
@@ -28,7 +28,7 @@ Configuration:
-Ddruid.zk.service.host=localhost
--Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
+-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid
diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md
index 4a74b4f5b20..fadc7fbc963 100644
--- a/docs/content/Tutorial:-A-First-Look-at-Druid.md
+++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md
@@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz). Download this file to a directory of your choosing.
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.159
+cd druid-services-0.6.160
```
You should see a bunch of files:
diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
index 7ef8648cc0f..eed3030c71e 100644
--- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
+++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
@@ -91,7 +91,7 @@ druid.service=overlord
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid
diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md
index 3797746ca20..7b21b72aa69 100644
--- a/docs/content/Tutorial:-The-Druid-Cluster.md
+++ b/docs/content/Tutorial:-The-Druid-Cluster.md
@@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
-You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz)
+You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz)
and untar the contents within by issuing:
@@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.160","io.druid.extensions:druid-kafka-seven:0.6.160"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md
index 47ebec7e049..5739f625a5e 100644
--- a/docs/content/Tutorial:-Webstream.md
+++ b/docs/content/Tutorial:-Webstream.md
@@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz)
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.159
+cd druid-services-0.6.160
```
You should see a bunch of files:
diff --git a/docs/content/Twitter-Tutorial.md b/docs/content/Twitter-Tutorial.md
index 95993732449..844788a33b2 100644
--- a/docs/content/Twitter-Tutorial.md
+++ b/docs/content/Twitter-Tutorial.md
@@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz).
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz).
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:
diff --git a/docs/content/index.md b/docs/content/index.md
index 529a2325436..3c236cc81be 100644
--- a/docs/content/index.md
+++ b/docs/content/index.md
@@ -37,17 +37,6 @@ When Druid?
* You want to do your analysis on data as it’s happening (in real-time)
* You need a data store that is always available, 24x7x365, and years into the future.
-
-Not Druid?
-----------
-
-* The amount of data you have can easily be handled by MySQL
-* You're querying for individual entries or doing lookups (not analytics)
-* Batch ingestion is good enough
-* Canned queries are good enough
-* Downtime is no big deal
-
-
Druid vs…
----------
@@ -60,7 +49,7 @@ Druid vs…
About This Page
----------
-The data store world is vast, confusing and constantly in flux. This page is meant to help potential evaluators decide whether Druid is a good fit for the problem one needs to solve. If anything about it is incorrect please provide that feedback on the mailing list or via some other means so we can fix it.
+The data infrastructure world is vast, confusing and constantly in flux. This page is meant to help potential evaluators decide whether Druid is a good fit for the problem one needs to solve. If anything about it is incorrect please provide that feedback on the mailing list or via some other means so we can fix it.
diff --git a/docs/content/toc.textile b/docs/content/toc.textile
index 21f867bca36..29c08ab4d0e 100644
--- a/docs/content/toc.textile
+++ b/docs/content/toc.textile
@@ -19,7 +19,7 @@ h2. Booting a Druid Cluster
* "Production Cluster Configuration":Production-Cluster-Configuration.html
* "Production Hadoop Configuration":Hadoop-Configuration.html
* "Rolling Cluster Updates":Rolling-Updates.html
-* "Best Practices":Best-Practices.html
+* "Recommendations":Recommendations.html
h2. Configuration
* "Common Configuration":Configuration.html
diff --git a/examples/pom.xml b/examples/pom.xml
index fb2597326e7..33515fffe61 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index 6cb8f84483e..7cdaea48728 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/histogram/pom.xml b/histogram/pom.xml
index a26ce5f5601..f0e11ea375e 100644
--- a/histogram/pom.xml
+++ b/histogram/pom.xml
@@ -27,7 +27,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index cfd2e8da529..24373fc733f 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
index e6402221e45..335c8c1858b 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
@@ -41,6 +41,7 @@ import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
+import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
@@ -172,6 +173,7 @@ public class HadoopDruidIndexerConfig
private volatile PathSpec pathSpec;
private volatile Map shardSpecLookups = Maps.newHashMap();
private volatile Map hadoopShardSpecLookup = Maps.newHashMap();
+ private final QueryGranularity rollupGran;
@JsonCreator
public HadoopDruidIndexerConfig(
@@ -203,6 +205,7 @@ public class HadoopDruidIndexerConfig
hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
}
}
+ this.rollupGran = schema.getDataSchema().getGranularitySpec().getQueryGranularity();
}
@JsonProperty
@@ -326,7 +329,7 @@ public class HadoopDruidIndexerConfig
return Optional.absent();
}
- final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(inputRow);
+ final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec);
return Optional.of(
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java
index b9947e81fe9..bb560e27656 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java
@@ -289,4 +289,4 @@ public class HadoopIngestionSpec extends IngestionSpec specs = Lists.newArrayList();
+ final int partitionCount = 10;
+ for (int i = 0; i < partitionCount; i++) {
+ specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i));
+ }
+ HadoopIngestionSpec spec = new HadoopIngestionSpec(
+ null, null, null,
+ "foo",
+ new TimestampSpec("timestamp", "auto"),
+ new JSONDataSpec(ImmutableList.of("foo"), null),
+ new UniformGranularitySpec(
+ Granularity.HOUR,
+ QueryGranularity.MINUTE,
+ ImmutableList.of(new Interval("2010-01-01/P1D")),
+ Granularity.HOUR
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ true,
+ ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs),
+ false,
+ new DataRollupSpec(ImmutableList.of(), QueryGranularity.MINUTE),
+ null,
+ false,
+ ImmutableMap.of("foo", "bar"),
+ false,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(spec);
+ final List dims = Arrays.asList("diM1", "dIM2");
+ final ImmutableMap values = ImmutableMap.of(
+ "Dim1",
+ "1",
+ "DiM2",
+ "2",
+ "dim1",
+ "3",
+ "dim2",
+ "4"
+ );
+ final long timestamp = new DateTime("2010-01-01T01:00:01").getMillis();
+ final Bucket expectedBucket = config.getBucket(new MapBasedInputRow(timestamp, dims, values)).get();
+ final long nextBucketTimestamp = QueryGranularity.MINUTE.next(QueryGranularity.MINUTE.truncate(timestamp));
+ // check that all rows having same set of dims and truncated timestamp hash to same bucket
+ for (int i = 0; timestamp + i < nextBucketTimestamp; i++) {
+ Assert.assertEquals(
+ expectedBucket.partitionNum,
+ config.getBucket(new MapBasedInputRow(timestamp + i, dims, values)).get().partitionNum
+ );
+ }
+
+ }
}
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 5fe0ebae569..653a2f6c033 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index b7496c70782..f04736a66e3 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -407,14 +407,14 @@ public class IndexTask extends AbstractFixedIntervalTask
final int myRowFlushBoundary = rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
-
+ final QueryGranularity rollupGran = ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity();
try {
plumber.startJob();
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
- if (shouldIndex(shardSpec, interval, inputRow)) {
+ if (shouldIndex(shardSpec, interval, inputRow, rollupGran)) {
int numRows = plumber.add(inputRow);
if (numRows == -1) {
throw new ISE(
@@ -469,13 +469,15 @@ public class IndexTask extends AbstractFixedIntervalTask
*
* @return true or false
*/
- private boolean shouldIndex(
+ private static boolean shouldIndex(
final ShardSpec shardSpec,
final Interval interval,
- final InputRow inputRow
+ final InputRow inputRow,
+ final QueryGranularity rollupGran
)
{
- return interval.contains(inputRow.getTimestampFromEpoch()) && shardSpec.isInChunk(inputRow);
+ return interval.contains(inputRow.getTimestampFromEpoch())
+ && shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow);
}
public static class IndexIngestionSpec extends IngestionSpec
diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml
index be87d5b4d79..79e28c016ec 100644
--- a/kafka-eight/pom.xml
+++ b/kafka-eight/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml
index a26e940b725..21acf89e6a4 100644
--- a/kafka-seven/pom.xml
+++ b/kafka-seven/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index 0c6a4b34aa8..e5baf549052 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
io.druiddruidpom
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOTdruiddruid
diff --git a/processing/pom.xml b/processing/pom.xml
index 867b720ca57..0b562f2a2fa 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
diff --git a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
index 908a17cb058..313ed85fc4c 100644
--- a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
@@ -26,6 +26,7 @@ import com.google.common.hash.Hashing;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
+import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.HistogramAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@@ -68,7 +69,8 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class),
- @JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class)
+ @JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class),
+ @JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class)
})
public static interface AggregatorFactoryMixin
{
diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
index 77ca1b32fe1..6b474799de0 100644
--- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
@@ -47,14 +47,14 @@ import java.util.concurrent.TimeoutException;
/**
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
- *
+ *
* When using this, it is important to make sure that the list of QueryRunners provided is fully flattened.
* If, for example, you were to pass a list of a Chained QueryRunner (A) and a non-chained QueryRunner (B). Imagine
* A has 2 QueryRunner chained together (Aa and Ab), the fact that the Queryables are run in parallel on an
* executor would mean that the Queryables are actually processed in the order
- *
+ *
*
A -> B -> Aa -> Ab
- *
+ *
* That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B
* must be fully cached in memory before the results for Aa and Ab are computed.
*/
@@ -113,6 +113,10 @@ public class ChainedExecutionQueryRunner implements QueryRunner
@Override
public ListenableFuture> apply(final QueryRunner input)
{
+ if (input == null) {
+ throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
+ }
+
return exec.submit(
new AbstractPrioritizedCallable>(priority)
{
@@ -120,10 +124,6 @@ public class ChainedExecutionQueryRunner implements QueryRunner
public Iterable call() throws Exception
{
try {
- if (input == null) {
- throw new ISE("Input is null?! How is this possible?!");
- }
-
Sequence result = input.run(query);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
@@ -155,7 +155,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner
queryWatcher.registerQuery(query, futures);
try {
- final Number timeout = query.getContextValue("timeout", (Number)null);
+ final Number timeout = query.getContextValue("timeout", (Number) null);
return new MergeIterable<>(
ordering.nullsFirst(),
timeout == null ?
@@ -168,10 +168,10 @@ public class ChainedExecutionQueryRunner implements QueryRunner
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
- catch(CancellationException e) {
+ catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
- catch(TimeoutException e) {
+ catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
index 1edf07b94d4..7e97ce03099 100644
--- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
@@ -81,9 +82,6 @@ public class GroupByParallelQueryRunner implements QueryRunner
final boolean bySegment = query.getContextBySegment(false);
final int priority = query.getContextPriority(0);
- if (Iterables.isEmpty(queryables)) {
- log.warn("No queryables found.");
- }
ListenableFuture> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
@@ -93,6 +91,10 @@ public class GroupByParallelQueryRunner implements QueryRunner
@Override
public ListenableFuture apply(final QueryRunner input)
{
+ if (input == null) {
+ throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
+ }
+
return exec.submit(
new AbstractPrioritizedCallable(priority)
{
diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregator.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregator.java
new file mode 100644
index 00000000000..2b78ed2cd22
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregator.java
@@ -0,0 +1,86 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import io.druid.segment.DimensionSelector;
+import io.druid.segment.data.IndexedInts;
+
+import javax.annotation.Nullable;
+
+public class FilteredAggregator implements Aggregator
+{
+ private final String name;
+ private final DimensionSelector dimSelector;
+ private final Aggregator delegate;
+ private final IntPredicate predicate;
+
+ public FilteredAggregator(String name, DimensionSelector dimSelector, IntPredicate predicate, Aggregator delegate)
+ {
+ this.name = name;
+ this.dimSelector = dimSelector;
+ this.delegate = delegate;
+ this.predicate = predicate;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ final IndexedInts row = dimSelector.getRow();
+ final int size = row.size();
+ for (int i = 0; i < size; ++i) {
+ if (predicate.apply(row.get(i))) {
+ delegate.aggregate();
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void reset()
+ {
+ delegate.reset();
+ }
+
+ @Override
+ public Object get()
+ {
+ return delegate.get();
+ }
+
+ @Override
+ public float getFloat()
+ {
+ return delegate.getFloat();
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public void close()
+ {
+ delegate.close();
+ }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java
new file mode 100644
index 00000000000..cd5685a83f7
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java
@@ -0,0 +1,216 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.metamx.common.ISE;
+import com.metamx.common.Pair;
+import io.druid.query.filter.DimFilter;
+import io.druid.query.filter.NotDimFilter;
+import io.druid.query.filter.SelectorDimFilter;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.DimensionSelector;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+
+public class FilteredAggregatorFactory implements AggregatorFactory
+{
+ private static final byte CACHE_TYPE_ID = 0x9;
+
+ private final String name;
+ private final AggregatorFactory delegate;
+ private final DimFilter filter;
+
+ public FilteredAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("aggregator") AggregatorFactory delegate,
+ @JsonProperty("filter") DimFilter filter
+ )
+ {
+ Preconditions.checkNotNull(delegate);
+ Preconditions.checkNotNull(filter);
+ Preconditions.checkArgument(
+ filter instanceof SelectorDimFilter ||
+ (filter instanceof NotDimFilter && ((NotDimFilter) filter).getField() instanceof SelectorDimFilter),
+ "FilteredAggregator currently only supports filters of type 'selector' and their negation"
+ );
+
+ this.name = name;
+ this.delegate = delegate;
+ this.filter = filter;
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory)
+ {
+ final Aggregator aggregator = delegate.factorize(metricFactory);
+ final Pair selectorPredicatePair = makeFilterPredicate(
+ filter,
+ metricFactory
+ );
+ return new FilteredAggregator(name, selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+ {
+ final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory);
+ final Pair selectorPredicatePair = makeFilterPredicate(
+ filter,
+ metricFactory
+ );
+ return new FilteredBufferAggregator(selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return delegate.getComparator();
+ }
+
+ @Override
+ public Object combine(Object lhs, Object rhs)
+ {
+ return delegate.combine(lhs, rhs);
+ }
+
+ @Override
+ public AggregatorFactory getCombiningFactory()
+ {
+ return delegate.getCombiningFactory();
+ }
+
+ @Override
+ public Object deserialize(Object object)
+ {
+ return delegate.deserialize(object);
+ }
+
+ @Override
+ public Object finalizeComputation(Object object)
+ {
+ return delegate.finalizeComputation(object);
+ }
+
+ @JsonProperty
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public List requiredFields()
+ {
+ return delegate.requiredFields();
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ byte[] filterCacheKey = filter.getCacheKey();
+ byte[] aggregatorCacheKey = delegate.getCacheKey();
+ return ByteBuffer.allocate(1 + filterCacheKey.length + aggregatorCacheKey.length)
+ .put(CACHE_TYPE_ID)
+ .put(filterCacheKey)
+ .put(aggregatorCacheKey)
+ .array();
+ }
+
+ @Override
+ public String getTypeName()
+ {
+ return delegate.getTypeName();
+ }
+
+ @Override
+ public int getMaxIntermediateSize()
+ {
+ return delegate.getMaxIntermediateSize();
+ }
+
+ @Override
+ public Object getAggregatorStartValue()
+ {
+ return delegate.getAggregatorStartValue();
+ }
+
+ @JsonProperty
+ public AggregatorFactory getAggregator()
+ {
+ return delegate;
+ }
+
+ @JsonProperty
+ public DimFilter getFilter()
+ {
+ return filter;
+ }
+
+ @Override
+ public List getRequiredColumns()
+ {
+ return delegate.getRequiredColumns();
+ }
+
+ private static Pair makeFilterPredicate(
+ final DimFilter dimFilter,
+ final ColumnSelectorFactory metricFactory
+ )
+ {
+ final SelectorDimFilter selector;
+ if (dimFilter instanceof NotDimFilter) {
+ // we only support NotDimFilter with Selector filter
+ selector = (SelectorDimFilter) ((NotDimFilter) dimFilter).getField();
+ } else if (dimFilter instanceof SelectorDimFilter) {
+ selector = (SelectorDimFilter) dimFilter;
+ } else {
+ throw new ISE("Unsupported DimFilter type [%d]", dimFilter.getClass());
+ }
+
+ final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(selector.getDimension());
+ final int lookupId = dimSelector.lookupId(selector.getValue());
+ final IntPredicate predicate;
+ if (dimFilter instanceof NotDimFilter) {
+ predicate = new IntPredicate()
+ {
+ @Override
+ public boolean apply(int value)
+ {
+ return lookupId != value;
+ }
+ };
+ } else {
+ predicate = new IntPredicate()
+ {
+ @Override
+ public boolean apply(int value)
+ {
+ return lookupId == value;
+ }
+ };
+ }
+ return Pair.of(dimSelector, predicate);
+ }
+
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/FilteredBufferAggregator.java
new file mode 100644
index 00000000000..4d1eeda99ba
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/FilteredBufferAggregator.java
@@ -0,0 +1,79 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import io.druid.segment.DimensionSelector;
+import io.druid.segment.data.IndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class FilteredBufferAggregator implements BufferAggregator
+{
+ private final DimensionSelector dimSelector;
+ private final IntPredicate predicate;
+ private final BufferAggregator delegate;
+
+ public FilteredBufferAggregator(DimensionSelector dimSelector, IntPredicate predicate, BufferAggregator delegate)
+ {
+ this.dimSelector = dimSelector;
+ this.predicate = predicate;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ delegate.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ final IndexedInts row = dimSelector.getRow();
+ final int size = row.size();
+ for (int i = 0; i < size; ++i) {
+ if (predicate.apply(row.get(i))) {
+ delegate.aggregate(buf, position);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return delegate.get(buf, position);
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ return delegate.getFloat(buf, position);
+ }
+
+ @Override
+ public void close()
+ {
+ delegate.close();
+ }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/IntPredicate.java b/processing/src/main/java/io/druid/query/aggregation/IntPredicate.java
new file mode 100644
index 00000000000..fd013004e91
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/IntPredicate.java
@@ -0,0 +1,29 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+/**
+ * can be replaced with http://docs.oracle.com/javase/8/docs/api/java/util/function/IntPredicate.html
+ * when druid moves to java 8.
+ */
+public interface IntPredicate
+{
+ boolean apply(int value);
+}
diff --git a/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java
new file mode 100644
index 00000000000..b50ebb810f6
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java
@@ -0,0 +1,184 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2014 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.query.aggregation;
+
+import io.druid.query.filter.NotDimFilter;
+import io.druid.query.filter.SelectorDimFilter;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.DimensionSelector;
+import io.druid.segment.FloatColumnSelector;
+import io.druid.segment.ObjectColumnSelector;
+import io.druid.segment.TimestampColumnSelector;
+import io.druid.segment.data.ArrayBasedIndexedInts;
+import io.druid.segment.data.IndexedInts;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FilteredAggregatorTest
+{
+ private void aggregate(TestFloatColumnSelector selector, FilteredAggregator agg)
+ {
+ agg.aggregate();
+ selector.increment();
+ }
+
+ @Test
+ public void testAggregate()
+ {
+ final float[] values = {0.15f, 0.27f};
+ final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
+
+ FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
+ "test",
+ new DoubleSumAggregatorFactory("billy", "value"),
+ new SelectorDimFilter("dim", "a")
+ );
+
+ FilteredAggregator agg = (FilteredAggregator) factory.factorize(
+ makeColumnSelector(selector)
+ );
+
+ Assert.assertEquals("test", agg.getName());
+
+ double expectedFirst = new Float(values[0]).doubleValue();
+ double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
+ double expectedThird = expectedSecond;
+
+ assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
+ }
+
+ private ColumnSelectorFactory makeColumnSelector(final TestFloatColumnSelector selector){
+
+ return new ColumnSelectorFactory()
+ {
+ @Override
+ public TimestampColumnSelector makeTimestampColumnSelector()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DimensionSelector makeDimensionSelector(String dimensionName)
+ {
+ if (dimensionName.equals("dim")) {
+ return new DimensionSelector()
+ {
+ @Override
+ public IndexedInts getRow()
+ {
+ if (selector.getIndex() % 3 == 2) {
+ return new ArrayBasedIndexedInts(new int[]{1});
+ } else {
+ return new ArrayBasedIndexedInts(new int[]{0});
+ }
+ }
+
+ @Override
+ public int getValueCardinality()
+ {
+ return 2;
+ }
+
+ @Override
+ public String lookupName(int id)
+ {
+ switch (id) {
+ case 0:
+ return "a";
+ case 1:
+ return "b";
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @Override
+ public int lookupId(String name)
+ {
+ switch (name) {
+ case "a":
+ return 0;
+ case "b":
+ return 1;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ };
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public FloatColumnSelector makeFloatColumnSelector(String columnName)
+ {
+ if (columnName.equals("value")) {
+ return selector;
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public ObjectColumnSelector makeObjectColumnSelector(String columnName)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private void assertValues(FilteredAggregator agg,TestFloatColumnSelector selector, double... expectedVals){
+ Assert.assertEquals(0.0d, agg.get());
+ Assert.assertEquals(0.0d, agg.get());
+ Assert.assertEquals(0.0d, agg.get());
+ for(double expectedVal : expectedVals){
+ aggregate(selector, agg);
+ Assert.assertEquals(expectedVal, agg.get());
+ Assert.assertEquals(expectedVal, agg.get());
+ Assert.assertEquals(expectedVal, agg.get());
+ }
+ }
+
+ @Test
+ public void testAggregateWithNotFilter()
+ {
+ final float[] values = {0.15f, 0.27f};
+ final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
+
+ FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
+ "test",
+ new DoubleSumAggregatorFactory("billy", "value"),
+ new NotDimFilter(new SelectorDimFilter("dim", "b"))
+ );
+
+ FilteredAggregator agg = (FilteredAggregator) factory.factorize(
+ makeColumnSelector(selector)
+ );
+
+ Assert.assertEquals("test", agg.getName());
+
+ double expectedFirst = new Float(values[0]).doubleValue();
+ double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
+ double expectedThird = expectedSecond;
+ assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
+ }
+
+}
diff --git a/processing/src/test/java/io/druid/query/aggregation/TestFloatColumnSelector.java b/processing/src/test/java/io/druid/query/aggregation/TestFloatColumnSelector.java
index 8aea1f41503..874ffb978a4 100644
--- a/processing/src/test/java/io/druid/query/aggregation/TestFloatColumnSelector.java
+++ b/processing/src/test/java/io/druid/query/aggregation/TestFloatColumnSelector.java
@@ -44,4 +44,9 @@ public class TestFloatColumnSelector implements FloatColumnSelector
{
++index;
}
+
+ public int getIndex()
+ {
+ return index;
+ }
}
diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml
index 745f10e88ac..210771cd139 100644
--- a/rabbitmq/pom.xml
+++ b/rabbitmq/pom.xml
@@ -9,7 +9,7 @@
io.druiddruid
- 0.6.160-SNAPSHOT
+ 0.6.161-SNAPSHOT
@@ -39,5 +39,11 @@
commons-clitest
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+ test
+
\ No newline at end of file
diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java
index 132fe3b6179..28f1d0d14d1 100644
--- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java
+++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/JacksonifiedConnectionFactory.java
@@ -19,12 +19,12 @@
package io.druid.firehose.rabbitmq;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.LongString;
-import java.net.URISyntaxException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
import java.util.Map;
/**
@@ -33,140 +33,229 @@ import java.util.Map;
*/
public class JacksonifiedConnectionFactory extends ConnectionFactory
{
+ public static JacksonifiedConnectionFactory makeDefaultConnectionFactory() throws Exception
+ {
+ return new JacksonifiedConnectionFactory(null, 0, null, null, null, null, 0, 0, 0, 0, null);
+ }
+
+ private static Map getSerializableClientProperties(final Map clientProperties)
+ {
+ return Maps.transformEntries(
+ clientProperties,
+ new Maps.EntryTransformer()
+ {
+ @Override
+ public Object transformEntry(String key, Object value)
+ {
+ if (value instanceof LongString) {
+ return value.toString();
+ }
+ return value;
+ }
+ }
+ );
+ }
+
+ private final String host;
+ private final int port;
+ private final String username;
+ private final String password;
+ private final String virtualHost;
+ private final String uri;
+ private final int requestedChannelMax;
+ private final int requestedFrameMax;
+ private final int requestedHeartbeat;
+ private final int connectionTimeout;
+ private final Map clientProperties;
+
+ @JsonCreator
+ public JacksonifiedConnectionFactory(
+ @JsonProperty("host") String host,
+ @JsonProperty("port") int port,
+ @JsonProperty("username") String username,
+ @JsonProperty("password") String password,
+ @JsonProperty("virtualHost") String virtualHost,
+ @JsonProperty("uri") String uri,
+ @JsonProperty("requestedChannelMax") int requestedChannelMax,
+ @JsonProperty("requestedFrameMax") int requestedFrameMax,
+ @JsonProperty("requestedHeartbeat") int requestedHeartbeat,
+ @JsonProperty("connectionTimeout") int connectionTimeout,
+ @JsonProperty("clientProperties") Map clientProperties
+ ) throws Exception
+ {
+ super();
+
+ this.host = host == null ? super.getHost() : host;
+ this.port = port == 0 ? super.getPort() : port;
+ this.username = username == null ? super.getUsername() : username;
+ this.password = password == null ? super.getPassword() : password;
+ this.virtualHost = virtualHost == null ? super.getVirtualHost() : virtualHost;
+ this.uri = uri;
+ this.requestedChannelMax = requestedChannelMax == 0 ? super.getRequestedChannelMax() : requestedChannelMax;
+ this.requestedFrameMax = requestedFrameMax == 0 ? super.getRequestedFrameMax() : requestedFrameMax;
+ this.requestedHeartbeat = requestedHeartbeat == 0 ? super.getRequestedHeartbeat() : requestedHeartbeat;
+ this.connectionTimeout = connectionTimeout == 0 ? super.getConnectionTimeout() : connectionTimeout;
+ this.clientProperties = clientProperties == null ? super.getClientProperties() : clientProperties;
+
+ super.setHost(this.host);
+ super.setPort(this.port);
+ super.setUsername(this.username);
+ super.setPassword(this.password);
+ super.setVirtualHost(this.virtualHost);
+ if (this.uri != null) {
+ super.setUri(this.uri);
+ }
+ super.setRequestedChannelMax(this.requestedChannelMax);
+ super.setRequestedFrameMax(this.requestedFrameMax);
+ super.setRequestedHeartbeat(this.requestedHeartbeat);
+ super.setConnectionTimeout(this.connectionTimeout);
+ super.setClientProperties(this.clientProperties);
+ }
+
@Override
@JsonProperty
public String getHost()
{
- return super.getHost();
- }
-
- @Override
- public void setHost(String host)
- {
- super.setHost(host);
+ return host;
}
@Override
@JsonProperty
public int getPort()
{
- return super.getPort();
+ return port;
}
- @Override
- public void setPort(int port)
- {
- super.setPort(port);
- }
@Override
@JsonProperty
public String getUsername()
{
- return super.getUsername();
- }
-
- @Override
- public void setUsername(String username)
- {
- super.setUsername(username);
+ return username;
}
@Override
@JsonProperty
public String getPassword()
{
- return super.getPassword();
- }
-
- @Override
- public void setPassword(String password)
- {
- super.setPassword(password);
+ return password;
}
@Override
@JsonProperty
public String getVirtualHost()
{
- return super.getVirtualHost();
+ return virtualHost;
}
- @Override
- public void setVirtualHost(String virtualHost)
- {
- super.setVirtualHost(virtualHost);
- }
-
- @Override
@JsonProperty
- public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
+ public String getUri()
{
- super.setUri(uriString);
+ return uri;
}
@Override
@JsonProperty
public int getRequestedChannelMax()
{
- return super.getRequestedChannelMax();
- }
-
- @Override
- public void setRequestedChannelMax(int requestedChannelMax)
- {
- super.setRequestedChannelMax(requestedChannelMax);
+ return requestedChannelMax;
}
@Override
@JsonProperty
public int getRequestedFrameMax()
{
- return super.getRequestedFrameMax();
- }
-
- @Override
- public void setRequestedFrameMax(int requestedFrameMax)
- {
- super.setRequestedFrameMax(requestedFrameMax);
+ return requestedFrameMax;
}
@Override
@JsonProperty
public int getRequestedHeartbeat()
{
- return super.getRequestedHeartbeat();
- }
-
- @Override
- public void setConnectionTimeout(int connectionTimeout)
- {
- super.setConnectionTimeout(connectionTimeout);
+ return requestedHeartbeat;
}
@Override
@JsonProperty
public int getConnectionTimeout()
{
- return super.getConnectionTimeout();
+ return connectionTimeout;
+ }
+
+ @JsonProperty("clientProperties")
+ public Map getSerializableClientProperties()
+ {
+ return getSerializableClientProperties(clientProperties);
}
@Override
- public void setRequestedHeartbeat(int requestedHeartbeat)
+ public boolean equals(Object o)
{
- super.setRequestedHeartbeat(requestedHeartbeat);
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JacksonifiedConnectionFactory that = (JacksonifiedConnectionFactory) o;
+
+ if (connectionTimeout != that.connectionTimeout) {
+ return false;
+ }
+ if (port != that.port) {
+ return false;
+ }
+ if (requestedChannelMax != that.requestedChannelMax) {
+ return false;
+ }
+ if (requestedFrameMax != that.requestedFrameMax) {
+ return false;
+ }
+ if (requestedHeartbeat != that.requestedHeartbeat) {
+ return false;
+ }
+ if (clientProperties != null
+ ? !Maps.difference(
+ getSerializableClientProperties(clientProperties),
+ getSerializableClientProperties(that.clientProperties)
+ ).areEqual()
+ : that.clientProperties != null) {
+ return false;
+ }
+ if (host != null ? !host.equals(that.host) : that.host != null) {
+ return false;
+ }
+ if (password != null ? !password.equals(that.password) : that.password != null) {
+ return false;
+ }
+ if (uri != null ? !uri.equals(that.uri) : that.uri != null) {
+ return false;
+ }
+ if (username != null ? !username.equals(that.username) : that.username != null) {
+ return false;
+ }
+ if (virtualHost != null ? !virtualHost.equals(that.virtualHost) : that.virtualHost != null) {
+ return false;
+ }
+
+ return true;
}
@Override
- @JsonProperty
- public Map getClientProperties()
+ public int hashCode()
{
- return super.getClientProperties();
- }
-
- @Override
- public void setClientProperties(Map clientProperties)
- {
- super.setClientProperties(clientProperties);
+ int result = host != null ? host.hashCode() : 0;
+ result = 31 * result + port;
+ result = 31 * result + (username != null ? username.hashCode() : 0);
+ result = 31 * result + (password != null ? password.hashCode() : 0);
+ result = 31 * result + (virtualHost != null ? virtualHost.hashCode() : 0);
+ result = 31 * result + (uri != null ? uri.hashCode() : 0);
+ result = 31 * result + requestedChannelMax;
+ result = 31 * result + requestedFrameMax;
+ result = 31 * result + requestedHeartbeat;
+ result = 31 * result + connectionTimeout;
+ result = 31 * result + (clientProperties != null ? clientProperties.hashCode() : 0);
+ return result;
}
}
diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java
index 548cbcc1d1a..8ca79bbea99 100644
--- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java
+++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQDruidModule.java
@@ -29,7 +29,7 @@ import java.util.List;
/**
*/
-public class RabbitMQDruidModule implements DruidModule
+public class RabbitMQDruidModule implements DruidModule
{
@Override
public List extends Module> getJacksonModules()
diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java
index 7bae291c8a3..325bc41cd92 100644
--- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java
+++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseConfig.java
@@ -19,6 +19,7 @@
package io.druid.firehose.rabbitmq;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
@@ -26,17 +27,50 @@ import com.fasterxml.jackson.annotation.JsonProperty;
*/
public class RabbitMQFirehoseConfig
{
- private String queue = null;
- private String exchange = null;
- private String routingKey = null;
- private boolean durable = false;
- private boolean exclusive = false;
- private boolean autoDelete = false;
-
// Lyra (auto reconnect) properties
- private int maxRetries = 100;
- private int retryIntervalSeconds = 2;
- private long maxDurationSeconds = 5 * 60;
+ private static final int defaultMaxRetries = 100;
+ private static final int defaultRetryIntervalSeconds = 2;
+ private static final long defaultMaxDurationSeconds = 5 * 60;
+
+ public static RabbitMQFirehoseConfig makeDefaultConfig()
+ {
+ return new RabbitMQFirehoseConfig(null, null, null, false, false, false, 0, 0, 0);
+ }
+
+ private final String queue;
+ private final String exchange;
+ private final String routingKey;
+ private final boolean durable;
+ private final boolean exclusive;
+ private final boolean autoDelete;
+ private final int maxRetries;
+ private final int retryIntervalSeconds;
+ private final long maxDurationSeconds;
+
+ @JsonCreator
+ public RabbitMQFirehoseConfig(
+ @JsonProperty("queue") String queue,
+ @JsonProperty("exchange") String exchange,
+ @JsonProperty("routingKey") String routingKey,
+ @JsonProperty("durable") boolean durable,
+ @JsonProperty("exclusive") boolean exclusive,
+ @JsonProperty("autoDelete") boolean autoDelete,
+ @JsonProperty("maxRetries") int maxRetries,
+ @JsonProperty("retryIntervalSeconds") int retryIntervalSeconds,
+ @JsonProperty("maxDurationSeconds") long maxDurationSeconds
+ )
+ {
+ this.queue = queue;
+ this.exchange = exchange;
+ this.routingKey = routingKey;
+ this.durable = durable;
+ this.exclusive = exclusive;
+ this.autoDelete = autoDelete;
+
+ this.maxRetries = maxRetries == 0 ? defaultMaxRetries : maxRetries;
+ this.retryIntervalSeconds = retryIntervalSeconds == 0 ? defaultRetryIntervalSeconds : retryIntervalSeconds;
+ this.maxDurationSeconds = maxDurationSeconds == 0 ? defaultMaxDurationSeconds : maxDurationSeconds;
+ }
@JsonProperty
public String getQueue()
@@ -44,90 +78,109 @@ public class RabbitMQFirehoseConfig
return queue;
}
- public void setQueue(String queue)
- {
- this.queue = queue;
- }
-
@JsonProperty
public String getExchange()
{
return exchange;
}
- public void setExchange(String exchange)
- {
- this.exchange = exchange;
- }
-
@JsonProperty
public String getRoutingKey()
{
return routingKey;
}
- public void setRoutingKey(String routingKey)
- {
- this.routingKey = routingKey;
- }
-
@JsonProperty
public boolean isDurable()
{
return durable;
}
- public void setDurable(boolean durable)
- {
- this.durable = durable;
- }
-
@JsonProperty
public boolean isExclusive()
{
return exclusive;
}
- public void setExclusive(boolean exclusive)
- {
- this.exclusive = exclusive;
- }
-
@JsonProperty
public boolean isAutoDelete()
{
return autoDelete;
}
- public void setAutoDelete(boolean autoDelete)
- {
- this.autoDelete = autoDelete;
- }
-
@JsonProperty
- public int getMaxRetries() {
+ public int getMaxRetries()
+ {
return maxRetries;
}
- public void setMaxRetries(int maxRetries) {
- this.maxRetries = maxRetries;
- }
-
@JsonProperty
- public int getRetryIntervalSeconds() {
+ public int getRetryIntervalSeconds()
+ {
return retryIntervalSeconds;
}
- public void setRetryIntervalSeconds(int retryIntervalSeconds) {
- this.retryIntervalSeconds = retryIntervalSeconds;
- }
-
@JsonProperty
- public long getMaxDurationSeconds() {
+ public long getMaxDurationSeconds()
+ {
return maxDurationSeconds;
}
- public void setMaxDurationSeconds(int maxDurationSeconds) {
- this.maxDurationSeconds = maxDurationSeconds;
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RabbitMQFirehoseConfig that = (RabbitMQFirehoseConfig) o;
+
+ if (autoDelete != that.autoDelete) {
+ return false;
+ }
+ if (durable != that.durable) {
+ return false;
+ }
+ if (exclusive != that.exclusive) {
+ return false;
+ }
+ if (maxDurationSeconds != that.maxDurationSeconds) {
+ return false;
+ }
+ if (maxRetries != that.maxRetries) {
+ return false;
+ }
+ if (retryIntervalSeconds != that.retryIntervalSeconds) {
+ return false;
+ }
+ if (exchange != null ? !exchange.equals(that.exchange) : that.exchange != null) {
+ return false;
+ }
+ if (queue != null ? !queue.equals(that.queue) : that.queue != null) {
+ return false;
+ }
+ if (routingKey != null ? !routingKey.equals(that.routingKey) : that.routingKey != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = queue != null ? queue.hashCode() : 0;
+ result = 31 * result + (exchange != null ? exchange.hashCode() : 0);
+ result = 31 * result + (routingKey != null ? routingKey.hashCode() : 0);
+ result = 31 * result + (durable ? 1 : 0);
+ result = 31 * result + (exclusive ? 1 : 0);
+ result = 31 * result + (autoDelete ? 1 : 0);
+ result = 31 * result + maxRetries;
+ result = 31 * result + retryIntervalSeconds;
+ result = 31 * result + (int) (maxDurationSeconds ^ (maxDurationSeconds >>> 32));
+ return result;
}
}
diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java
index 8504c70ed9d..f38398bcc84 100644
--- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java
+++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java
@@ -26,7 +26,6 @@ import com.metamx.common.logger.Logger;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
@@ -50,14 +49,14 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* A FirehoseFactory for RabbitMQ.
- *
+ *
* It will receive it's configuration through the realtime.spec file and expects to find a
* consumerProps element in the firehose definition with values for a number of configuration options.
* Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options
* that have defaults can be skipped but options with no defaults must be specified with the exception
* of the URI property. If the URI property is set, it will override any other property that was also
* set.
- *
+ *
* File: realtime.spec
*