merge changes from druid-0.7.x

This commit is contained in:
nishantmonu51 2014-08-12 15:47:42 +05:30
commit c6712739dc
276 changed files with 6420 additions and 2095 deletions

BIN
CodeStyle.jar Normal file

Binary file not shown.

7
README
View File

@ -1,7 +0,0 @@
The best place for more Druid resources is at: http://www.druid.io
Looking for docs? http://druid.io/docs/latest/
Build with build.sh
Want to get started? http://druid.io/docs/latest/Tutorial:-A-First-Look-at-Druid.html

22
README.md Normal file
View File

@ -0,0 +1,22 @@
## Druid
Druid is a distributed, column-oriented, real-time analytics data store that is
commonly used to power exploratory dashboards in multi-tenant environments.
Druid excels as a data warehousing solution for fast aggregate queries on
petabyte sized data sets. Druid supports a variety of flexible filters, exact
calculations, approximate algorithms, and other useful calculations Druid can
load both streaming and batch data and integrates with Storm and Hadoop.
### More Information
Much more information about Druid can be found on our [website](http://www.druid.io).
### Documentation
We host documentation on our [website](http://druid.io/docs/latest/). If you want to contribute documentation changes, please submit a pull request to this repository.
### Tutorials
We have a series of tutorials to get started with Druid, starting with this [one](http://druid.io/docs/latest/Tutorial:-A-First-Look-at-Druid.html).
### Support
Report any bugs using [GitHub issues](https://github.com/metamx/druid/issues).
Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in `#druid-dev` on `irc.freenode.net`.

View File

@ -25,9 +25,3 @@ echo " "
echo " The following self-contained jars (and more) have been built:"
echo " "
find . -name '*-selfcontained.jar'
echo " "
echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
echo "See also http://druid.io/docs/latest"

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -1,43 +1,55 @@
#! /bin/bash -e
SCRIPT_DIR=$(cd $(dirname "$0") && pwd)
if [ -z ${1} ]; then
VERSION=$(cat $SCRIPT_DIR/../../pom.xml | grep version | head -4 | tail -1 | sed 's_.*<version>\([^<]*\)</version>.*_\1_')
if [ -z "$1" ]; then
version="latest"
else
VERSION=${1}
version=$1
fi
#if [ -z "$(git tag -l "druid-$VERSION")" ]
if [ -z "$(git tag -l "druid-$VERSION")" ] && [ "$VERSION" != "latest" ]; then
echo "Version tag does not exist: druid-$VERSION"
docs=$(git -C "$(dirname "$0")" rev-parse --show-toplevel)/docs
if [ -n "$(git -C "$docs" status --porcelain --untracked-files=no content)" ]; then
echo "Docs directory is not clean, aborting"
exit 1
fi
if [ -z "$(git tag -l "druid-$version")" ] && [ "$version" != "latest" ]; then
echo "Version tag does not exist: druid-$version"
exit 1;
fi
WORKING_DIR=$(mktemp -d -t druid-docs-deploy)
tmp=$(mktemp -d -t druid-docs-deploy)
echo Using Version [${VERSION}]
echo Script in [${SCRIPT_DIR}]
echo Deploying to [${WORKING_DIR}]
echo "Using Version [$version]"
echo "Working directory [$tmp]"
if [ -d ${WORKING_DIR} ]; then
echo DELETING ${WORKING_DIR}
rm -rf ${WORKING_DIR}
git clone git@github.com:druid-io/druid-io.github.io.git "$tmp"
target=$tmp/docs/$version
mkdir -p $target
rsync -a --delete "$docs/content/" $target
branch=update-docs-$version
git -C $tmp checkout -b $branch
git -C $tmp add -A .
git -C $tmp commit -m "Update $version docs"
git -C $tmp push origin $branch
if [ -n "$GIT_TOKEN" ]; then
curl -u "$GIT_TOKEN:x-oauth-basic" -XPOST -d@- \
https://api.github.com/repos/druid-io/druid-io.github.io/pulls <<EOF
{
"title" : "Update $version docs",
"head" : "$branch",
"base" : "master"
}
EOF
else
echo "GitHub personal token not provided, not submitting pull request"
echo "Please go to https://github.com/druid-io/druid-io.github.io and submit a pull request from the \`$branch\` branch"
fi
git clone git@github.com:druid-io/druid-io.github.io.git ${WORKING_DIR}
DOC_DIR=${WORKING_DIR}/docs/${VERSION}/
mkdir -p ${DOC_DIR}
cp -r ${SCRIPT_DIR}/../content/* ${DOC_DIR}
BRANCH=docs-${VERSION}
pushd ${WORKING_DIR}
git checkout -b ${BRANCH}
git add .
git commit -m "Deploy ${VERSION} docs"
git push origin ${BRANCH}
popd
rm -rf ${WORKING_DIR}
rm -rf $tmp

View File

@ -1,48 +1,10 @@
<!-- Start page_footer include -->
<div class="container">
<footer>
<div class="container">
<hr>
<div class="row">
<div class="col-md-4">
<address>
<strong>CONTACT US</strong>
<a href="mailto:info@druid.io">info@druid.io</a>
</address>
<address>
<div class="soc">
<a href="https://twitter.com/druidio"></a>
<a href="https://github.com/metamx/druid" class="github"></a>
<a href="http://www.meetup.com/Open-Druid/" class="meet"></a>
<a href="http://druid.io/feed/" class="rss" target="_blank"></a>
</div>
</div>
<ul class="col-md-4 list-unstyled">
<li><a href="/"><strong>DRUID</strong></a></li>
<li><a href="/druid.html">What is Druid?</a></li>
<li><a href="/downloads.html">Downloads</a></li>
<li><a target="_blank" href="Home.html">Documentation</a></li>
</ul>
<ul class="col-md-4 list-unstyled">
<li><a href="/community.html"><strong>SUPPORT</strong></a></li>
<li><a href="/community.html">Community</a></li>
<li><a href="/faq.html">FAQ</a></li>
<li><a href="/licensing.html">Licensing</a></li>
<li><a href="/blog"><strong>BLOG</strong></a></li>
</ul>
</div>
</div>
</footer>
</footer>
</div>
<script type="text/javascript">
var gaJsHost = (("https:" == document.location.protocol) ? "https://ssl." : "http://www.");
document.write(unescape("%3Cscript src='" + gaJsHost + "google-analytics.com/ga.js' type='text/javascript'%3E%3C/script%3E"));
</script>
<script type="text/javascript">
try {
var pageTracker = _gat._getTracker("UA-40280432-1");
pageTracker._trackPageview();
} catch(err) {}
<script src="http://code.jquery.com/jquery.min.js"></script>
</script>
<!-- stop page_footer include -->

View File

@ -1,25 +1,16 @@
<!-- Start page_header include -->
<div class="navbar navbar-inverse navbar-static-top">
<div class="container druid-navbar">
<div class="navbar navbar-inverse navbar-static-top druid-nav">
<div class="container">
<div class="navbar-header">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand" href="/">Druid</a>
</div>
<div class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<li {% if page.id == 'home' %} class="active"{% endif %}><a href="/">Home</a></li>
<li {% if page.sectionid == 'druid' %} class="active"{% endif %}><a href="/druid.html">What is Druid?</a></li>
<li {% if page.sectionid == 'downloads' %} class="active"{% endif %}><a href="/downloads.html">Downloads</a></li>
<li {% if page.sectionid == 'docs' %} class="active"{% endif %}><a href="https://github.com/metamx/druid/wiki">Documentation</a></li>
<li {% if page.sectionid == 'community' %} class="active"{% endif %}><a href="/community.html">Community</a></li>
<li {% if page.sectionid == 'faq' %} class="active"{% endif %}><a href="/faq.html">FAQ</a></li>
<li {% if page.sectionid == 'blog' %} class="active"{% endif %}><a href="/blog">Blog</a></li>
<li class="divider"></li>
</ul>
</div>
</div>

View File

@ -5,15 +5,13 @@
<title>Druid | {{page.title}}</title>
<!-- Latest compiled and minified CSS -->
<link rel="stylesheet" href="http://netdna.bootstrapcdn.com/bootstrap/3.0.0/css/bootstrap.css">
<link rel="stylesheet" href="//maxcdn.bootstrapcdn.com/bootstrap/3.2.0/css/bootstrap.min.css">
<link href='http://fonts.googleapis.com/css?family=Open+Sans:400,600,300,700,800' rel='stylesheet' type='text/css'>
<link href='http://fonts.googleapis.com/css?family=Open+Sans+Condensed:300,700,300italic|Open+Sans:300italic,400italic,600italic,400,300,600' rel='stylesheet' type='text/css'>
<link rel="alternate" type="application/atom+xml" href="http://druid.io/feed">
<link rel="stylesheet" href="//druid.io/css/main.css">
<link rel="stylesheet" href="//druid.io/css/header.css">
<link rel="stylesheet" href="//druid.io/css/footer.css">
<link rel="stylesheet" href="//druid.io/css/syntax.css">
<link rel="stylesheet" href="//druid.io/css/docs.css">

View File

@ -2,35 +2,30 @@
<html lang="en">
<head>
{% include site_head.html %}
<link rel="stylesheet" href="css/docs.css">
</head>
<body>
{% include page_header.html %}
<div class="container">
<div class="page-header">
<h1>Documentation</h1>
<div class="druid-header">
<div class="container">
<h1>Documentation</h1>
<h4></h4>
</div>
</div>
<div class="container">
<div class="row">
<div class="col-md-3 toc" id="toc">
</div>
<div class="col-md-9 doc-content">
{{ content }}
</div>
<div class="col-md-3 toc" id="toc">
</div>
</div>
</div>
{% include page_footer.html %}
<script src="http://code.jquery.com/jquery.js"></script>
<script src="http://netdna.bootstrapcdn.com/bootstrap/3.0.0/js/bootstrap.min.js"></script>
<script>
$(function(){
$("#toc").load("toc.html");
});
</script>
<script>$(function() { $(".toc").load("toc.html"); });</script>
</body>
</html>

View File

@ -1,3 +1,6 @@
---
layout: doc_page
---
### ApproxHistogram aggregator
This aggregator is based on [http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf](http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf) to compute approximate histograms.

View File

@ -83,7 +83,7 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
"type": "hashed"
"targetPartitionSize": 5000000
},
"metadataUpdateSpec": {
"updaterJobSpec": {
"type": "db",
"connectURI": "jdbc:mysql:\/\/localhost:7980\/test_db",
"user": "username",
@ -272,7 +272,7 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
|config|A Hadoop Index Config (see above).|yes|
|hadoopCoordinates|The Maven `<groupId>:<artifactId>:<version>` of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `metadataUpdateSpec`. The Indexing Service takes care of setting these fields internally.
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.
To run the task:

View File

@ -12,7 +12,7 @@ You can provision individual servers, loading Druid onto each machine (or buildi
[Apache Whirr](http://whirr.apache.org/) is a set of libraries for launching cloud services. For Druid, Whirr serves as an easy way to launch a cluster in Amazon AWS by using simple commands and configuration files (called *recipes*).
**NOTE:** Whirr will install Druid 0.6.121. Also, it doesn't work with JDK1.7.0_55. JDK1.7.0_45 recommended.
**NOTE:** Whirr will install Druid 0.6.115 (an older version of Druid). Also, it doesn't work with JDK1.7.0_55. JDK1.7.0_45 recommended.
You'll need an AWS account, S3 Bucket and an EC2 key pair from that account so that Whirr can connect to the cloud via the EC2 API. If you haven't generated a key pair, see the [AWS documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) or see this [Whirr FAQ](http://whirr.apache.org/faq.html#how-do-i-find-my-cloud-credentials).

View File

@ -12,8 +12,12 @@ The broker module uses several of the default modules in [Configuration](Configu
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to historical nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`|
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None|
|`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
#### Local Cache

View File

@ -51,7 +51,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|Property|Description|Default|
|--------|-----------|-------|
|`millisToWaitBeforeDeleting`|How long does the coordinator need to be active before it can start deleting segments.|90000 (15 mins)|
|`millisToWaitBeforeDeleting`|How long does the coordinator need to be active before it can start removing (marking unused) segments in metadata storage.|900000 (15 mins)|
|`mergeBytesLimit`|The maximum number of bytes to merge (for segments).|524288000L|
|`mergeSegmentsLimit`|The maximum number of segments that can be in a single [merge task](Tasks.html).|100|
|`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5|

View File

@ -4,7 +4,7 @@ layout: doc_page
Data Formats for Ingestion
==========================
Druid can ingest data in JSON, CSV, or TSV. While most examples in the documentation use data in JSON format, it is not difficult to configure Druid to ingest CSV or TSV data.
Druid can ingest data in JSON, CSV, or custom delimited data such as TSV. While most examples in the documentation use data in JSON format, it is not difficult to configure Druid to ingest CSV or other delimited data.
## Formatting the Data
The following are three samples of the data used in the [Wikipedia example](Tutorial:-Loading-Your-Data-Part-1.html).
@ -41,8 +41,8 @@ _TSV_
Note that the CSV and TSV data do not contain column heads. This becomes important when you specify the data for ingesting.
## Configuring Ingestion For the Indexing Service
If you use the [indexing service](Indexing-Service.html) for ingesting the data, a [task](Tasks.html) must be configured and submitted. Tasks are configured with a JSON object which, among other things, specifies the data source and type. In the Wikipedia example, JSON data was read from a local file. The task spec contains a firehose element to specify this:
## Configuration
All forms of Druid ingestion require some form of schema object. An example blob of json pertaining to the data format may look something like this:
```json
"firehose" : {

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.121
git checkout druid-0.6.143
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -36,10 +36,40 @@ See [Examples](Examples.html). This firehose connects directly to the twitter sp
See [Examples](Examples.html). This firehose creates a stream of random numbers.
#### RabbitMqFirehouse
#### RabbitMqFirehose
This firehose ingests events from a define rabbit-mq queue.
#### IngestSegmentFirehose
This Firehose can be used to read the data from existing druid segments.
It can be used ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
A sample ingest firehose spec is shown below -
```json
{
"type" : "ingestSegment",
"dataSource" : "wikipedia",
"interval" : "2013-01-01/2013-01-02",
"dimensions":[],
"metrics":[]
}
```
|property|description|required?|
|--------|-----------|---------|
|type|ingestSegment. Type of firehose|yes|
|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes|
|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
|dimensions|The list of dimensions to select. If left empty, all dimensions are selected.|no|
|metrics|The list of metrics to select. If left empty, all metrics are returned.|no|
|filter| See [Filters](Filters.html)|yes|
Parsing Data
------------

View File

@ -22,7 +22,7 @@ The following configs only apply if the overlord is running in remote mode:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a middle manager before throwing an error.|PT5M|
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |none|
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|false|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
@ -80,7 +80,6 @@ Issuing a GET request at the same URL will return the current worker setup spec
|Property|Description|Default|
|--------|-----------|-------|
|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be druid.indexer.runner.minWorkerVersion.|none|
|`minNumWorkers`|The minimum number of workers that can be in the cluster at any given time.|0|
|`maxNumWorkers`|The maximum number of workers that can be in the cluster at any given time.|0|
|`nodeData`|A JSON object that describes how to launch new nodes. Currently, only EC2 is supported.|none; required|

View File

@ -37,6 +37,11 @@ You can check the coordinator console located at `<COORDINATOR_IP>:<PORT>/cluste
You can check `<BROKER_IP>:<PORT>/druid/v2/datasources/<YOUR_DATASOURCE>?interval=0/3000` for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists. Note: the broker endpoint will only return valid results on historical segments.
## How can I Reindex existing data in Druid with schema changes?
You can use IngestSegmentFirehose with index task to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
See [Firehose](Firehose.html) for more details on IngestSegmentFirehose.
## More information
Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -0,0 +1,54 @@
---
layout: doc_page
---
Ingesting from Kafka 8
----------------------
The previous examples are for Kafka 7. To support Kafka 8, a couple changes need to be made:
- Update realtime node's configs for Kafka 8 extensions
- e.g.
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.143",...]`
- becomes
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.143",...]`
- Update realtime task config for changed keys
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
```json
"firehose" : {
"type" : "kafka-0.8",
"consumerProps" : {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms": "15000",
"zookeeper.session.timeout.ms": "15000",
"zookeeper.sync.time.ms": "5000",
"group.id": "topic-pixel-local",
"fetch.message.max.bytes": "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed" : "druidtest",
"parser" : {
"timestampSpec" : {
"column" : "utcdt",
"format" : "iso"
},
"data" : {
"format" : "json"
},
"dimensionExclusions" : [
"wp"
]
}
},
"plumber" : {
"type" : "realtime",
"windowPeriod" : "PT10m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicyFactory": {
"type": "messageTime"
}
}
```

39
docs/content/Logging.md Normal file
View File

@ -0,0 +1,39 @@
---
layout: doc_page
---
Logging
==========================
Druid nodes will emit logs that are useful for debugging to the console. Druid nodes also emit periodic metrics about their state. For more about metrics, see [Configuration](Configuration.html). Metric logs are printed to the console by default, and can be disabled with `-Ddruid.emitter.logging.logLevel=debug`.
Druid uses [log4j](http://logging.apache.org/log4j/2.x/) for logging, and console logs can be configured by adding a log4j.xml file. Add this xml file to your classpath if you want to override default Druid log configuration.
An example log4j.xml file is shown below:
```
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="ConsoleAppender" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{ISO8601} %-5p [%t] %c - %m%n"/>
</layout>
</appender>
<!-- ServerView-related stuff is way too chatty -->
<logger name="io.druid.client.BatchServerInventoryView">
<level value="warn"/>
</logger>
<logger name="io.druid.curator.inventory.CuratorInventoryManager">
<level value="warn"/>
</logger>
<root>
<priority value="info" />
<appender-ref ref="ConsoleAppender"/>
</root>
</log4j:configuration>
```

View File

@ -48,7 +48,6 @@ Middle managers pass their configurations down to their child peons. The middle
|`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the middle managers should compress Znodes.|false|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.taskDir`|Temporary intermediate directory used during task execution.|/tmp/persistent|
|`druid.indexer.runner.javaCommand`|Command required to execute java.|java|
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""|
|`druid.indexer.runner.classpath`|Java classpath for the peon.|System.getProperty("java.class.path")|

View File

@ -0,0 +1,17 @@
---
layout: doc_page
---
Working with different versions of Hadoop may require a bit of extra work for the time being. We will make changes to support different Hadoop versions in the near future. If you have problems outside of these instructions, please feel free to contact us in IRC or on the [forum](https://groups.google.com/forum/#!forum/druid-development).
Working with Hadoop 2.x
-----------------------
The default version of Hadoop bundled with Druid is 2.3. This should work out of the box.
To override the default Hadoop version, both the Hadoop Index Task and the standalone Hadoop indexer support the parameter `hadoopDependencyCoordinates`. You can pass another set of Hadoop coordinates through this parameter (e.g. You can specify coordinates for Hadoop 2.4.0 as `["org.apache.hadoop:hadoop-client:2.4.0"]`).
The Hadoop Index Task takes this parameter has part of the task JSON and the standalone Hadoop indexer takes this parameter as a command line argument.
Working with Hadoop 1.x and older
---------------------------------
We recommend recompiling Druid with your particular version of Hadoop by changing the dependencies in Druid's pom.xml files. Make sure to also either override the default `hadoopDependencyCoordinates` in the code or pass your Hadoop version in as part of indexing.

View File

@ -4,6 +4,8 @@ layout: doc_page
Production Cluster Configuration
================================
__This configuration is an example of what a production cluster could look like. Many other hardware combinations are possible! Cheaper hardware is absolutely possible.__
This production Druid cluster assumes that MySQL and Zookeeper are already set up. The deep storage that is used for examples is S3 and memcached is used for a distributed cache.
The nodes that respond to queries (Historical, Broker, and Middle manager nodes) will use as many cores as are available, depending on usage, so it is best to keep these on dedicated machines. The upper limit of effectively utilized cores is not well characterized yet and would depend on types of queries, query load, and the schema. Historical daemons should have a heap a size of at least 1GB per core for normal usage, but could be squeezed into a smaller heap for testing. Since in-memory caching is essential for good performance, even more RAM is better. Broker nodes will use RAM for caching, so they do more than just route queries. SSDs are highly recommended for Historical nodes not all data is loaded in available memory.
@ -55,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/overlord
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -137,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/middlemanager
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -154,8 +156,7 @@ druid.indexer.logs.s3Prefix=prod/logs/v1
# Dedicate more resources to peons
druid.indexer.runner.javaOpts=-server -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
druid.indexer.runner.taskDir=/mnt/persistent/task/
druid.indexer.task.taskDir=/mnt/persistent/task/
druid.indexer.task.baseTaskDir=/mnt/persistent/task/
druid.indexer.task.chathandler.type=announce
druid.indexer.fork.property.druid.indexer.hadoopWorkingPath=/tmp/druid-indexing
@ -285,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/historical
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -1,12 +1,17 @@
---
layout: doc_page
---
Querying
========
Queries are made using an HTTP REST style request to a [Broker](Broker.html), [Historical](Historical.html), or [Realtime](Realtime.html) node. The query is expressed in JSON and each of these node types expose the same REST query interface.
Queries are made using an HTTP REST style request to a [Broker](Broker.html),
[Historical](Historical.html), or [Realtime](Realtime.html) node. The
query is expressed in JSON and each of these node types expose the same
REST query interface.
We start by describing an example query with additional comments that mention possible variations. Query operators are also summarized in a table below.
We start by describing an example query with additional comments that mention
possible variations. Query operators are also summarized in a table below.
Example Query "rand"
--------------------
@ -108,21 +113,52 @@ Query Operators
The following table summarizes query properties.
|query types|property|description|required?|
|-----------|--------|-----------|---------|
|timeseries, groupBy, search, timeBoundary|dataSource|query is applied to this data source|yes|
|timeseries, groupBy, search|intervals|range of time series to include in query|yes|
|timeseries, groupBy, search, timeBoundary|context|This is a key-value map that can allow the query to alter some of the behavior of a query. It is primarily used for debugging, for example if you include `"bySegment":true` in the map, you will get results associated with the data segment they came from.|no|
|timeseries, groupBy, search|filter|Specifies the filter (the "WHERE" clause in SQL) for the query. See [Filters](Filters.html)|no|
|timeseries, groupBy, search|granularity|the timestamp granularity to bucket results into (i.e. "hour"). See [Granularities](Granularities.html) for more information.|no|
Properties shared by all query types
|property |description|required?|
|----------|-----------|---------|
|dataSource|query is applied to this data source|yes|
|intervals |range of time series to include in query|yes|
|context |This is a key-value map used to alter some of the behavior of a query. See [Query Context](#query-context) below|no|
|query type|property |description|required?|
|----------|----------|-----------|---------|
|timeseries, topN, groupBy, search|filter|Specifies the filter (the "WHERE" clause in SQL) for the query. See [Filters](Filters.html)|no|
|timeseries, topN, groupBy, search|granularity|the timestamp granularity to bucket results into (i.e. "hour"). See [Granularities](Granularities.html) for more information.|no|
|timeseries, topN, groupBy|aggregations|aggregations that combine values in a bucket. See [Aggregations](Aggregations.html).|yes|
|timeseries, topN, groupBy|postAggregations|aggregations of aggregations. See [Post Aggregations](Post Aggregations.html).|yes|
|groupBy|dimensions|constrains the groupings; if empty, then one value per time granularity bucket|yes|
|timeseries, groupBy|aggregations|aggregations that combine values in a bucket. See [Aggregations](Aggregations.html).|yes|
|timeseries, groupBy|postAggregations|aggregations of aggregations. See [Post Aggregations](Post Aggregations.html).|yes|
|search|limit|maximum number of results (default is 1000), a system-level maximum can also be set via `com.metamx.query.search.maxSearchLimit`|no|
|search|searchDimensions|Dimensions to apply the search query to. If not specified, it will search through all dimensions.|no|
|search|query|The query portion of the search query. This is essentially a predicate that specifies if something matches.|yes|
Additional Information about Query Types
----------------------------------------
Query Context
-------------
[TimeseriesQuery](TimeseriesQuery.html)
|property |default | description |
|--------------|---------------------|----------------------|
|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overriden in the broker or historical node configuration |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Pimarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
Query Cancellation
------------------
Queries can be cancelled explicitely using their unique identifier. If the
query identifier is set at the time of query, or is otherwise known, the following
endpoint can be used on the broker or router to cancel the query.
```sh
DELETE /druid/v2/{queryId}
```
For example, if the query ID is `abc123`, the query can be cancelled as follows:
```sh
curl -X DELETE "http://host:port/druid/v2/abc123"
```

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -0,0 +1,46 @@
---
layout: doc_page
---
Rolling Updates
===============
For rolling Druid cluster updates with no downtime, we recommend updating Druid nodes in the following order:
1. Historical Nodes
2. Indexing Service/Real-time Nodes
3. Broker Nodes
4. Coordinator Nodes
## Historical Nodes
Historical nodes can be updated one at a time. Each historical node has a startup time to memory map all the segments it was serving before the update. The startup time typically takes a few seconds to a few minutes, depending on the hardware of the node. As long as each historical node is updated with a sufficient delay (greater than the time required to start a single node), you can rolling update the entire historical cluster.
## Standalone Real-time nodes
Standalone real-time nodes can be updated one at a time in a rolling fashion.
## Indexing Service
### With Autoscaling
Overlord nodes will try to launch new middle manager nodes and terminate old ones without dropping data. This process is based on the configuration `druid.indexer.runner.minWorkerVersion=#{VERSION}`. Each time you update your overlord node, the `VERSION` value should be increased.
The config `druid.indexer.autoscale.workerVersion=#{VERSION}` also needs to be set.
### Without Autoscaling
Middle managers can be updated in a rolling fashion based on API.
To prepare a middle manager for update, send a POST request to `<MiddleManager_IP:PORT>/druid/worker/v1/disable`. The overlord will now no longer send tasks to this middle manager.
Current tasks will still try to complete. To view all existing tasks, send a GET request to `<MiddleManager_IP:PORT>/druid/worker/v1/tasks`. When this list is empty, the middle manager can be updated. After the middle manager is updated, it is automatically enabled again. You can also manually enable middle managers POSTing to `<MiddleManager_IP:PORT>/druid/worker/v1/enable`.
## Broker Nodes
Broker nodes can be updated one at a time in a rolling fashion. There needs to be some delay between updating each node as brokers must load the entire state of the cluster before they return valid results.
## Coordinator Nodes
Coordinator nodes can be updated in a rolling fashion.

View File

@ -28,7 +28,7 @@ Configuration:
-Ddruid.zk.service.host=localhost
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid

View File

@ -77,7 +77,7 @@ The Hadoop Index Task is used to index larger data sets that require the paralle
|hadoopCoordinates|The Maven \<groupId\>:\<artifactId\>:\<version\> of Hadoop to use. The default is "org.apache.hadoop:hadoop-client:2.3.0".|no|
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `metadataUpdateSpec`. The Indexing Service takes care of setting these fields internally.
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.
#### Using your own Hadoop distribution
@ -220,7 +220,7 @@ Kill tasks delete all information about a segment and removes it from deep stora
"type": "kill",
"id": <task_id>,
"dataSource": <task_datasource>,
"segments": <JSON list of DataSegment objects to append>
"interval" : <all_segments_in_this_interval_will_die!>
}
```

View File

@ -7,7 +7,8 @@ Time boundary queries return the earliest and latest data points of a data set.
```json
{
"queryType" : "timeBoundary",
"dataSource": "sample_datasource"
"dataSource": "sample_datasource",
"bound" : < "maxTime" | "minTime" > # optional, defaults to returning both timestamps if not set
}
```
@ -17,6 +18,7 @@ There are 3 main parts to a time boundary query:
|--------|-----------|---------|
|queryType|This String should always be "timeBoundary"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
|bound | Optional, set to `maxTime` or `minTime` to return only the latest or earliest timestamp. Default to returning both if not set| no |
|context|An additional JSON Object which can be used to specify certain flags.|no|
The format of the result is:

View File

@ -60,3 +60,19 @@ See [http://www.davekoelle.com/alphanum.html](http://www.davekoelle.com/alphanum
|--------|-----------|---------|
|type|this indicates an alpha-numeric sort|yes|
|previousStop|the starting point of the alpha-numeric sort. For example, if a previousStop value is 'b', all values before 'b' are discarded. This field can be used to paginate through all the dimension values.|no|
## Inverted TopNMetricSpec
Sort dimension values in inverted order, i.e inverts the order of the delegate metric spec. It can be used to sort the values in descending order.
```json
"metric": {
"type": "inverted",
"metric": <delegate_top_n_metric_spec>
}
```
|property|description|required?|
|--------|-----------|---------|
|type|this indicates an inverted sort|yes|
|metric|the delegate metric spec. |yes|

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.121
cd druid-services-0.6.143
```
You should see a bunch of files:
@ -289,7 +289,7 @@ and put the following in there:
"dataSource": "wikipedia",
"granularity": "all",
"dimensions": [ "page" ],
"orderBy": {
"limitSpec": {
"type": "default",
"columns": [ { "dimension": "edit_count", "direction": "DESCENDING" } ],
"limit": 10
@ -302,7 +302,7 @@ and put the following in there:
}
```
Woah! Our query just got a way more complicated. Now we have these [Filters](Filters.html) things and this [OrderBy](OrderBy.html) thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
Woah! Our query just got a way more complicated. Now we have these [Filters](Filters.html) things and this [LimitSpec](LimitSpec.html) thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
If you issue the query:

View File

@ -91,7 +91,7 @@ druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.143"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid

View File

@ -109,22 +109,27 @@ You should be comfortable starting Druid nodes at this point. If not, it may be
{
"schema": {
"dataSource": "wikipedia",
"aggregators" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"aggregators" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"indexGranularity": "none"
},
"config": {
@ -196,13 +201,15 @@ Note: This config uses a "test" [rejection policy](Plumber.html) which will acce
Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) to the real-time node should yield valid results:
```json
[ {
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-08-31T12:41:27.000Z"
[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-08-31T12:41:27.000Z"
}
}
} ]
]
```
Batch Ingestion
@ -287,22 +294,27 @@ Examining the contents of the file, you should find:
},
"targetPartitionSize" : 5000000,
"rollupSpec" : {
"aggs": [{
"aggs": [
{
"type" : "count",
"name" : "count"
}, {
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
}
],
"rollupGranularity" : "none"
}
}

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.143"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.143","io.druid.extensions:druid-kafka-seven:0.6.143"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.121
cd druid-services-0.6.143
```
You should see a bunch of files:
@ -251,7 +251,7 @@ and put the following in there:
"dataSource": "webstream",
"granularity": "all",
"dimensions": [ "geo_region" ],
"orderBy": {
"limitSpec": {
"type": "default",
"columns": [
{ "dimension": "known_users", "direction": "DESCENDING" }
@ -267,7 +267,7 @@ and put the following in there:
}
```
Woah! Our query just got a way more complicated. Now we have these [Filters](Filters.html) things and this [OrderBy](OrderBy.html) thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
Woah! Our query just got a way more complicated. Now we have these [Filters](Filters.html) things and this [LimitSpec](LimitSpec.html) thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
If you issue the query:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz).
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.143-bin.tar.gz).
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -306,7 +306,7 @@ and put the following in there:
"dataSource": "twitterstream",
"granularity": "all",
"dimensions": ["htags"],
"orderBy": {"type":"default", "columns":[{"dimension": "tweets", "direction":"DESCENDING"}], "limit":5},
"limitSpec": {"type":"default", "columns":[{"dimension": "tweets", "direction":"DESCENDING"}], "limit":5},
"aggregations":[
{ "type": "longSum", "fieldName": "tweets", "name": "tweets"}
],
@ -315,7 +315,7 @@ and put the following in there:
}
```
Woah! Our query just got a way more complicated. Now we have these "Filters":Filters.html things and this "OrderBy":OrderBy.html thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
Woah! Our query just got a way more complicated. Now we have these [Filters](Filters.html) things and this [LimitSpec](LimitSpec.html) thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
If you issue the query:

View File

@ -1,60 +0,0 @@
.doc-content pre, .doc-content pre code {
overflow: auto;
white-space: pre;
word-wrap: normal;
}
.doc-content p {
margin: 18px 0 18px 0;
}
/*** HACK: This is a horrible hack, but I have no clue why images don't want to stay in the container **/
.doc-content img {
max-width: 847.5px;
}
.doc-content code {
background-color: #e0e0e0;
}
.doc-content pre code {
background-color: transparent;
}
.doc-content table,
.doc-content pre {
margin: 35px 0 35px 0;
}
.doc-content table,
.doc-content table > thead > tr > th,
.doc-content table > tbody > tr > th,
.doc-content table > tfoot > tr > th,
.doc-content table > thead > tr > td,
.doc-content table > tbody > tr > td,
.doc-content table > tfoot > tr > td {
border: 1px solid #dddddd;
}
.doc-content table > thead > tr > th,
.doc-content table > thead > tr > td {
border-bottom-width: 2px;
}
.doc-content table > tbody > tr:nth-child(odd) > td,
.doc-content table > tbody > tr:nth-child(odd) > th {
background-color: #f9f9f9;
}
.doc-content table > tbody > tr:hover > td,
.doc-content table > tbody > tr:hover > th {
background-color: #d5d5d5;
}
.doc-content table code {
background-color: transparent;
}
td, th {
padding: 5px;
}

View File

@ -1,6 +0,0 @@
.toc ul {
list-style: none;
list-style-position: inside;
padding-left: 15px;
}

View File

@ -17,6 +17,7 @@ h2. Getting Started
h2. Booting a Druid Cluster
* "Simple Cluster Configuration":Simple-Cluster-Configuration.html
* "Production Cluster Configuration":Production-Cluster-Configuration.html
* "Rolling Cluster Updates":Rolling-Updates.html
h2. Configuration
* "Common Configuration":Configuration.html
@ -25,19 +26,22 @@ h2. Configuration
* "Historical":Historical-Config.html
* "Broker":Broker-Config.html
* "Realtime":Realtime-Config.html
* "Configuring Logging":./Logging.html
h2. Data Ingestion
* "Ingestion FAQ":./Ingestion-FAQ.html
* "Realtime":./Realtime-ingestion.html
** "Kafka-0.8.x Ingestion":./Kafka-Eight.html
* "Batch":./Batch-ingestion.html
** "Different Hadoop Versions":./Other-Hadoop.html
* "Indexing Service":./Indexing-Service.html
** "Tasks":./Tasks.html
* "Data Formats":./Data_formats.html
* "Ingestion FAQ":./Ingestion-FAQ.html
h2. Operations
* "Performance FAQ":./Performance-FAQ.html
* "Extending Druid":./Modules.html
* "Booting a Production Cluster":./Booting-a-production-cluster.html
* "Performance FAQ":./Performance-FAQ.html
h2. Querying
* "Querying":./Querying.html
@ -48,7 +52,7 @@ h2. Querying
** "DimensionSpecs":./DimensionSpecs.html
* Query Types
** "GroupBy":./GroupByQuery.html
*** "OrderBy":./OrderBy.html
*** "LimitSpec":./LimitSpec.html
*** "Having":./Having.html
** "Search":./SearchQuery.html
*** "SearchQuerySpec":./SearchQuerySpec.html
@ -72,7 +76,7 @@ h2. Architecture
*** "Peon":./Peons.html
* External Dependencies
** "Deep Storage":./Deep-Storage.html
** "MySQL":./MySQL.html
** "Metadata Storage":./MySQL.html
** "ZooKeeper":./ZooKeeper.html
h2. Experimental

View File

@ -1,14 +0,0 @@
.blog-listing {
margin-bottom: 70px;
}
.blog-entry {
margin-bottom: 70px;
}
.recents ul li {
font-weight: 400;
margin-bottom: 15px;
}

View File

@ -1,21 +0,0 @@
.sub-text {
margin-top: 20px;
margin-bottom: 50px;
}
.main-marketing {
margin-bottom: 50px;
}
.main-marketing a {
color: #000000;
}
h2 {
font-weight: 400;
font-size: 30px;
}
.main-marketing img {
margin-bottom: 40px;
}

View File

@ -58,6 +58,7 @@ DRUID_CP=${EXAMPLE_LOC}
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
#For the kit
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_global
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime
echo "Running command:"

View File

@ -0,0 +1,23 @@
# Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138","io.druid.extensions:druid-rabbitmq:0.6.138", "io.druid.extensions:druid-s3-extensions:0.6.138"]
# Zookeeper
druid.zk.service.host=localhost
# Metadata Storage
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
# Deep storage
druid.storage.type=local
druid.storage.storage.storageDirectory=/tmp/druid/localStorage
# Indexing service discovery
druid.selectors.indexing.serviceName=overlord
# Monitoring (disabled for examples)
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
# Metrics logging (disabled for examples)
druid.emitter=noop

View File

@ -2,8 +2,6 @@ druid.host=localhost
druid.service=broker
druid.port=8080
druid.zk.service.host=localhost
# Change these to make Druid faster
# Bump these up only for faster nested groupBy
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1

View File

@ -2,10 +2,4 @@ druid.host=localhost
druid.service=coordinator
druid.port=8082
druid.zk.service.host=localhost
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.coordinator.startDelay=PT70s

View File

@ -2,9 +2,7 @@ druid.host=localhost
druid.service=historical
druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -1,18 +1,8 @@
druid.host=localhost
druid.port=8087
druid.port=8080
druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx256m"
druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.processing.numThreads=1
druid.indexer.fork.property.druid.computation.buffer.size=100000000

View File

@ -2,19 +2,11 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121","io.druid.extensions:druid-rabbitmq:0.6.121"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
# These configs are only required for real hand off
# druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
# druid.db.connector.user=druid
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
# Enable Real monitoring
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor","io.druid.segment.realtime.RealtimeMetricsMonitor"]

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -17,8 +17,7 @@
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-histogram</artifactId>
@ -28,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -48,6 +48,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -241,7 +242,8 @@ public class ApproximateHistogramQueryTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -179,7 +179,16 @@ public class DetermineHashedPartitionsJob implements Jobby
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
} else {
for (int i = 0; i < numberOfShards; ++i) {
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
actualSpecs.add(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(
i,
numberOfShards,
HadoopDruidIndexerConfig.jsonMapper
),
shardCount++
)
);
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
}
}

View File

@ -67,7 +67,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
for (int i = 0; i < shardsPerInterval; i++) {
specs.add(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, shardsPerInterval),
new HashBasedNumberedShardSpec(i, shardsPerInterval, HadoopDruidIndexerConfig.jsonMapper),
shardCount++
)
);

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Maps;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@ -30,27 +31,28 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.guice.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -169,6 +171,8 @@ public class HadoopDruidIndexerConfig
private volatile HadoopIngestionSpec schema;
private volatile PathSpec pathSpec;
private volatile ColumnConfig columnConfig;
private volatile Map<DateTime,ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private volatile Map<ShardSpec, HadoopyShardSpec> hadoopShardSpecLookup = Maps.newHashMap();
@JsonCreator
public HadoopDruidIndexerConfig(
@ -178,6 +182,29 @@ public class HadoopDruidIndexerConfig
this.columnConfig = columnConfig;
this.schema = schema;
this.pathSpec = jsonMapper.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : schema.getTuningConfig().getShardSpecs().entrySet()) {
if (entry.getValue() == null || entry.getValue().isEmpty()) {
continue;
}
final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec();
shardSpecLookups.put(
entry.getKey(), actualSpec.getLookup(
Lists.transform(
entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>()
{
@Override
public ShardSpec apply(HadoopyShardSpec input)
{
return input.getActualSpec();
}
}
)
)
);
for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) {
hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
}
}
}
@JsonProperty
@ -306,25 +333,17 @@ public class HadoopDruidIndexerConfig
return Optional.absent();
}
final List<HadoopyShardSpec> shards = schema.getTuningConfig().getShardSpecs().get(timeBucket.get().getStart());
if (shards == null || shards.isEmpty()) {
return Optional.absent();
}
final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart()).getShardSpec(inputRow);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec);
for (final HadoopyShardSpec hadoopyShardSpec : shards) {
final ShardSpec actualSpec = hadoopyShardSpec.getActualSpec();
if (actualSpec.isInChunk(inputRow)) {
return Optional.of(
new Bucket(
hadoopyShardSpec.getShardNum(),
timeBucket.get().getStart(),
actualSpec.getPartitionNum()
)
);
}
}
return Optional.of(
new Bucket(
hadoopyShardSpec.getShardNum(),
timeBucket.get().getStart(),
actualSpec.getPartitionNum()
)
);
throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards);
}
public Optional<Set<Interval>> getSegmentGranularIntervals()

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -24,8 +24,9 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import java.util.List;
@ -37,7 +38,8 @@ public class IndexingServiceFirehoseModule implements DruidModule
return ImmutableList.<Module>of(
new SimpleModule("IndexingServiceFirehoseModule")
.registerSubtypes(
new NamedType(EventReceiverFirehoseFactory.class, "receiver")
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(IngestSegmentFirehoseFactory.class, "ingestSegment")
)
);
}

View File

@ -19,10 +19,13 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -53,12 +56,14 @@ import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
@ -107,6 +112,8 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonIgnore
private final IndexIngestionSpec ingestionSchema;
private final ObjectMapper jsonMapper;
@JsonCreator
public IndexTask(
@JsonProperty("id") String id,
@ -118,7 +125,8 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonProperty("indexGranularity") final QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") final int targetPartitionSize,
@JsonProperty("firehose") final FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") final int rowFlushBoundary
@JsonProperty("rowFlushBoundary") final int rowFlushBoundary,
@JacksonInject ObjectMapper jsonMapper
)
{
super(
@ -139,9 +147,10 @@ public class IndexTask extends AbstractFixedIntervalTask
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
),
new IndexIOConfig(firehoseFactory),
new IndexTuningConfig(targetPartitionSize, rowFlushBoundary)
new IndexTuningConfig(targetPartitionSize, rowFlushBoundary, null)
);
}
this.jsonMapper = jsonMapper;
}
@Override
@ -175,7 +184,15 @@ public class IndexTask extends AbstractFixedIntervalTask
if (targetPartitionSize > 0) {
shardSpecs = determinePartitions(bucket, targetPartitionSize);
} else {
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
int numShards = ingestionSchema.getTuningConfig().getNumShards();
if (numShards > 0) {
shardSpecs = Lists.newArrayList();
for (int i = 0; i < numShards; i++) {
shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, jsonMapper));
}
} else {
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
}
}
for (final ShardSpec shardSpec : shardSpecs) {
final DataSegment segment = generateSegment(
@ -206,6 +223,7 @@ public class IndexTask extends AbstractFixedIntervalTask
retVal.add(interval);
}
}
return retVal;
}
@ -481,7 +499,7 @@ public class IndexTask extends AbstractFixedIntervalTask
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0) : tuningConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null) : tuningConfig;
}
@Override
@ -534,15 +552,22 @@ public class IndexTask extends AbstractFixedIntervalTask
private final int targetPartitionSize;
private final int rowFlushBoundary;
private final int numShards;
@JsonCreator
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
@JsonProperty("numShards") @Nullable Integer numShards
)
{
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
this.numShards = numShards == null ? -1 : numShards;
Preconditions.checkArgument(
this.targetPartitionSize == -1 || this.numShards == -1,
"targetPartitionsSize and shardCount both cannot be set"
);
}
@JsonProperty
@ -556,5 +581,11 @@ public class IndexTask extends AbstractFixedIntervalTask
{
return rowFlushBoundary;
}
@JsonProperty
public int getNumShards()
{
return numShards;
}
}
}

View File

@ -115,7 +115,8 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory
)
{
super(
@ -142,7 +143,7 @@ public class RealtimeIndexTask extends AbstractTask
windowPeriod,
null,
null,
rejectionPolicyFactory,
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists,
spec.getShardSpec()
),
@ -315,6 +316,7 @@ public class RealtimeIndexTask extends AbstractTask
null,
null,
null,
null,
0
);

View File

@ -0,0 +1,371 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Injector;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.task.NoopTask;
import io.druid.query.filter.DimFilter;
import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);
private final String dataSource;
private final Interval interval;
private final DimFilter dimFilter;
private final List<String> dimensions;
private final List<String> metrics;
private final Injector injector;
@JsonCreator
public IngestSegmentFirehoseFactory(
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JacksonInject Injector injector
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(interval, "interval");
this.dataSource = dataSource;
this.interval = interval;
this.dimFilter = dimFilter;
this.dimensions = dimensions;
this.metrics = metrics;
this.injector = injector;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
return dimFilter;
}
@JsonProperty
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty
public List<String> getMetrics()
{
return metrics;
}
@Override
public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException
{
log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval);
// better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method.
// Noop Task is just used to create the toolbox and list segments.
final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build(
new NoopTask("reingest", 0, 0, null, null)
);
try {
final List<DataSegment> usedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval));
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<String, DataSegment>(
Ordering.<String>natural().nullsFirst()
);
for (DataSegment segment : usedSegments) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(
interval
);
List<String> dims;
if (dimensions != null) {
dims = dimensions;
} else {
Set<String> dimSet = new HashSet<>();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) {
dimSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions());
}
dims = Lists.newArrayList(dimSet);
}
List<String> metricsList;
if (metrics != null) {
metricsList = metrics;
} else {
Set<String> metricsSet = new HashSet<>();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) {
metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions());
}
metricsList = Lists.newArrayList(metricsSet);
}
final List<StorageAdapter> adapters = Lists.transform(
timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, StorageAdapter>()
{
@Override
public StorageAdapter apply(TimelineObjectHolder<String, DataSegment> input)
{
final DataSegment segment = input.getObject().getChunk(0).getObject();
final File file = Preconditions.checkNotNull(
segmentFileMap.get(segment),
"File for segment %s", segment.getIdentifier()
);
try {
return new QueryableIndexStorageAdapter((IndexIO.loadIndex(file)));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
);
return new IngestSegmentFirehose(adapters, dims, metricsList);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
catch (SegmentLoadingException e) {
throw Throwables.propagate(e);
}
}
@Override
public InputRowParser getParser()
{
return null;
}
public class IngestSegmentFirehose implements Firehose
{
private volatile Yielder<InputRow> rowYielder;
public IngestSegmentFirehose(List<StorageAdapter> adapters, final List<String> dims, final List<String> metrics)
{
Sequence<InputRow> rows = Sequences.concat(
Iterables.transform(
adapters, new Function<StorageAdapter, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(@Nullable StorageAdapter adapter)
{
return Sequences.concat(
Sequences.map(
adapter.makeCursors(
Filters.convertDimensionFilters(dimFilter),
interval,
QueryGranularity.ALL
), new Function<Cursor, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(@Nullable final Cursor cursor)
{
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
dimSelectors.put(dim, dimSelector);
}
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
}
return Sequences.simple(
new Iterable<InputRow>()
{
@Override
public Iterator<InputRow> iterator()
{
return new Iterator<InputRow>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}
@Override
public InputRow next()
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getTimestamp();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}
@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
};
}
}
);
}
}
)
);
}
}
)
);
rowYielder = rows.toYielder(
null,
new YieldingAccumulator()
{
@Override
public Object accumulate(Object accumulated, Object in)
{
yield();
return in;
}
}
);
}
@Override
public boolean hasMore()
{
return !rowYielder.isDone();
}
@Override
public InputRow nextRow()
{
final InputRow inputRow = rowYielder.get();
rowYielder = rowYielder.next(null);
return inputRow;
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
rowYielder.close();
}
}
}

View File

@ -42,6 +42,7 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig;
@ -74,6 +75,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private static final Splitter whiteSpaceSplitter = Splitter.on(CharMatcher.WHITESPACE).omitEmptyStrings();
private final ForkingTaskRunnerConfig config;
private final TaskConfig taskConfig;
private final Properties props;
private final TaskLogPusher taskLogPusher;
private final DruidNode node;
@ -86,6 +88,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
@Inject
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
TaskConfig taskConfig,
WorkerConfig workerConfig,
Properties props,
TaskLogPusher taskLogPusher,
@ -94,6 +97,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
)
{
this.config = config;
this.taskConfig = taskConfig;
this.props = props;
this.taskLogPusher = taskLogPusher;
this.jsonMapper = jsonMapper;
@ -119,7 +123,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
public TaskStatus call()
{
final String attemptUUID = UUID.randomUUID().toString();
final File taskDir = new File(config.getTaskDir(), task.getId());
final File taskDir = new File(taskConfig.getBaseTaskDir(), task.getId());
final File attemptDir = new File(taskDir, attemptUUID);
final ProcessHolder processHolder;

View File

@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.server.DruidNode;
@ -34,6 +35,7 @@ import java.util.Properties;
public class ForkingTaskRunnerFactory implements TaskRunnerFactory
{
private final ForkingTaskRunnerConfig config;
private final TaskConfig taskConfig;
private final WorkerConfig workerConfig;
private final Properties props;
private final ObjectMapper jsonMapper;
@ -43,6 +45,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
@Inject
public ForkingTaskRunnerFactory(
final ForkingTaskRunnerConfig config,
final TaskConfig taskConfig,
final WorkerConfig workerConfig,
final Properties props,
final ObjectMapper jsonMapper,
@ -50,6 +53,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
@Self DruidNode node
) {
this.config = config;
this.taskConfig = taskConfig;
this.workerConfig = workerConfig;
this.props = props;
this.jsonMapper = jsonMapper;
@ -60,6 +64,6 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
@Override
public TaskRunner build()
{
return new ForkingTaskRunner(config, workerConfig, props, persistentTaskLogs, jsonMapper, node);
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
}
}

View File

@ -422,11 +422,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private RemoteTaskRunnerWorkItem addPendingTask(final Task task)
{
log.info("Added pending task %s", task.getId());
final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
task.getId(),
SettableFuture.<TaskStatus>create(),
null
);
final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null);
pendingTaskPayloads.put(task.getId(), task);
pendingTasks.put(task.getId(), taskRunnerWorkItem);
runPendingTasks();
@ -663,17 +659,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp;
} else {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
SettableFuture.<TaskStatus>create(),
zkWorker.getWorker()
);
runningTasks.put(taskId, taskRunnerWorkItem.withWorker(zkWorker.getWorker()));
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
newTaskRunnerWorkItem
);
if (existingItem == null) {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = newTaskRunnerWorkItem;
} else {
taskRunnerWorkItem = existingItem;
}
}
if (taskStatus.isComplete()) {
@ -806,8 +809,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
);
sortedWorkers.addAll(zkWorkers.values());
final String workerSetupDataMinVer = workerSetupData.get() == null ? null : workerSetupData.get().getMinVersion();
final String minWorkerVer = workerSetupDataMinVer == null ? config.getMinWorkerVersion() : workerSetupDataMinVer;
final String minWorkerVer = config.getMinWorkerVersion();
for (ZkWorker zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
@ -836,7 +838,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
// Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime());
} else {
log.info("No worker run task[%s] with status[%s]", taskStatus.getId(), taskStatus.getStatusCode());
log.info("Workerless task[%s] completed with status[%s]", taskStatus.getId(), taskStatus.getStatusCode());
}
// Move from running -> complete
@ -844,9 +846,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
runningTasks.remove(taskStatus.getId());
// Notify interested parties
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
taskRunnerWorkItem.setResult(taskStatus);
}
}

View File

@ -32,6 +32,24 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
private final Worker worker;
public RemoteTaskRunnerWorkItem(
String taskId,
Worker worker
)
{
this(taskId, SettableFuture.<TaskStatus>create(), worker);
}
public RemoteTaskRunnerWorkItem(
String taskId,
DateTime createdTime,
DateTime queueInsertionTime,
Worker worker
)
{
this(taskId, SettableFuture.<TaskStatus>create(), createdTime, queueInsertionTime, worker);
}
private RemoteTaskRunnerWorkItem(
String taskId,
SettableFuture<TaskStatus> result,
Worker worker
@ -42,7 +60,7 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
this.worker = worker;
}
public RemoteTaskRunnerWorkItem(
private RemoteTaskRunnerWorkItem(
String taskId,
SettableFuture<TaskStatus> result,
DateTime createdTime,

View File

@ -29,10 +29,6 @@ import java.util.List;
public class ForkingTaskRunnerConfig
{
@JsonProperty
@NotNull
private String taskDir = "/tmp/persistent";
@JsonProperty
@NotNull
private String javaCommand = "java";
@ -66,11 +62,6 @@ public class ForkingTaskRunnerConfig
"java.io.tmpdir"
);
public String getTaskDir()
{
return taskDir;
}
public String getJavaCommand()
{
return javaCommand;

View File

@ -274,9 +274,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Override
public boolean apply(ZkWorker zkWorker)
{
final String minVersion = workerSetupData.getMinVersion() != null
? workerSetupData.getMinVersion()
: config.getWorkerVersion();
final String minVersion = config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}

View File

@ -28,7 +28,6 @@ public class WorkerSetupData
{
public static final String CONFIG_KEY = "worker.setup";
private final String minVersion;
private final int minNumWorkers;
private final int maxNumWorkers;
private final String availabilityZone;
@ -37,7 +36,6 @@ public class WorkerSetupData
@JsonCreator
public WorkerSetupData(
@JsonProperty("minVersion") String minVersion,
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("availabilityZone") String availabilityZone,
@ -45,7 +43,6 @@ public class WorkerSetupData
@JsonProperty("userData") EC2UserData userData
)
{
this.minVersion = minVersion;
this.minNumWorkers = minNumWorkers;
this.maxNumWorkers = maxNumWorkers;
this.availabilityZone = availabilityZone;
@ -53,12 +50,6 @@ public class WorkerSetupData
this.userData = userData;
}
@JsonProperty
public String getMinVersion()
{
return minVersion;
}
@JsonProperty
public int getMinNumWorkers()
{
@ -93,7 +84,6 @@ public class WorkerSetupData
public String toString()
{
return "WorkerSetupData{" +
"minVersion='" + minVersion + '\'' +
", minNumWorkers=" + minNumWorkers +
", maxNumWorkers=" + maxNumWorkers +
", availabilityZone=" + availabilityZone +

View File

@ -51,13 +51,13 @@ public class WorkerCuratorCoordinator
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework curatorFramework;
private final Worker worker;
private final Announcer announcer;
private final String baseAnnouncementsPath;
private final String baseTaskPath;
private final String baseStatusPath;
private volatile Worker worker;
private volatile boolean started;
@Inject
@ -129,7 +129,11 @@ public class WorkerCuratorCoordinator
try {
byte[] rawBytes = jsonMapper.writeValueAsBytes(data);
if (rawBytes.length > config.getMaxZnodeBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes());
throw new ISE(
"Length of raw bytes for task too large[%,d > %,d]",
rawBytes.length,
config.getMaxZnodeBytes()
);
}
curatorFramework.create()
@ -173,6 +177,11 @@ public class WorkerCuratorCoordinator
return getPath(Arrays.asList(baseStatusPath, statusId));
}
public Worker getWorker()
{
return worker;
}
public void unannounceTask(String taskId)
{
try {
@ -239,4 +248,16 @@ public class WorkerCuratorCoordinator
}
}
}
public void updateWorkerAnnouncement(Worker newWorker) throws Exception
{
synchronized (lock) {
if (!started) {
throw new ISE("Cannot update worker! Not Started!");
}
this.worker = newWorker;
announcer.update(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(newWorker));
}
}
}

View File

@ -19,12 +19,18 @@
package io.druid.indexing.worker.http;
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.ForkingTaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@ -42,18 +48,98 @@ import java.io.InputStream;
public class WorkerResource
{
private static final Logger log = new Logger(WorkerResource.class);
private static String DISABLED_VERSION = "";
private final Worker enabledWorker;
private final Worker disabledWorker;
private final WorkerCuratorCoordinator curatorCoordinator;
private final ForkingTaskRunner taskRunner;
@Inject
public WorkerResource(
Worker worker,
WorkerCuratorCoordinator curatorCoordinator,
ForkingTaskRunner taskRunner
) throws Exception
{
this.enabledWorker = worker;
this.disabledWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), DISABLED_VERSION);
this.curatorCoordinator = curatorCoordinator;
this.taskRunner = taskRunner;
}
@POST
@Path("/disable")
@Produces("application/json")
public Response doDisable()
{
try {
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@POST
@Path("/enable")
@Produces("application/json")
public Response doEnable()
{
try {
curatorCoordinator.updateWorkerAnnouncement(enabledWorker);
return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "enabled")).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@GET
@Path("/enabled")
@Produces("application/json")
public Response isEnabled()
{
try {
final Worker theWorker = curatorCoordinator.getWorker();
final boolean enabled = !theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION);
return Response.ok(ImmutableMap.of(theWorker.getHost(), enabled)).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@GET
@Path("/tasks")
@Produces("application/json")
public Response getTasks()
{
try {
return Response.ok(
Lists.newArrayList(
Collections2.transform(
taskRunner.getKnownTasks(),
new Function<TaskRunnerWorkItem, String>()
{
@Override
public String apply(TaskRunnerWorkItem input)
{
return input.getTaskId();
}
}
)
)
).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@POST
@Path("/task/{taskid}/shutdown")
@Produces("application/json")
@ -82,7 +168,8 @@ public class WorkerResource
if (stream.isPresent()) {
try {
return Response.ok(stream.get().getInput()).build();
} catch (Exception e) {
}
catch (Exception e) {
log.warn(e, "Failed to read log for task: %s", taskid);
return Response.serverError().build();
}

View File

@ -54,6 +54,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
1,
null,
null,
null
);
this.status = status;

View File

@ -19,6 +19,8 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
@ -67,16 +69,19 @@ public class TaskSerdeTest
QueryGranularity.NONE,
10000,
new LocalFirehoseFactory(new File("lol"), "rofl", null),
-1
-1,
jsonMapper
);
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
InjectableValues inject = new InjectableValues.Std()
.addValue(ObjectMapper.class, jsonMapper);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class);
final IndexTask task2 = jsonMapper.reader(Task.class).with(inject).readValue(json);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval());
@ -202,6 +207,7 @@ public class TaskSerdeTest
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
);

View File

@ -380,7 +380,7 @@ public class RemoteTaskRunnerTest
},
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
DSuppliers.of(new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null, null))),
DSuppliers.of(new AtomicReference<WorkerSetupData>(new WorkerSetupData(0, 1, null, null, null))),
null
);

View File

@ -43,6 +43,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TestUtils;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexing.common.SegmentLoaderFactory;
@ -249,7 +250,8 @@ public class TaskLifecycleTest
IR("2010-01-02T01", "a", "c", 1)
)
),
-1
-1,
TestUtils.MAPPER
);
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId());
@ -297,7 +299,8 @@ public class TaskLifecycleTest
QueryGranularity.NONE,
10000,
newMockExceptionalFirehoseFactory(),
-1
-1,
TestUtils.MAPPER
);
final TaskStatus status = runTask(indexTask);

View File

@ -95,7 +95,6 @@ public class EC2AutoScalingStrategyTest
{
workerSetupData.set(
new WorkerSetupData(
"0",
0,
1,
"",

View File

@ -27,8 +27,8 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
@ -67,7 +67,7 @@ public class SimpleResourceManagementStrategyTest
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<>(
new WorkerSetupData(
"0", 0, 2, null, null, null
0, 2, null, null, null
)
);
@ -113,7 +113,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -141,7 +141,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -157,7 +157,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -188,7 +188,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.terminateWithIds(EasyMock.<List<String>>anyObject()))
.andReturn(null);
.andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
@ -196,7 +196,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -214,7 +214,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
@ -237,7 +237,7 @@ public class SimpleResourceManagementStrategyTest
@Test
public void testDoSuccessfulTerminate() throws Exception
{
workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null));
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
@ -248,7 +248,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -267,7 +267,7 @@ public class SimpleResourceManagementStrategyTest
@Test
public void testSomethingTerminating() throws Exception
{
workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null));
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2);
@ -278,7 +278,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -293,7 +293,7 @@ public class SimpleResourceManagementStrategyTest
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -319,7 +319,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
@ -337,7 +337,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
@ -381,7 +381,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy);
// Increase minNumWorkers
workerSetupData.set(new WorkerSetupData("0", 3, 5, null, null, null));
workerSetupData.set(new WorkerSetupData(3, 5, null, null, null));
// Should provision two new workers
EasyMock.reset(autoScalingStrategy);
@ -404,85 +404,6 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testMinVersionIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h1", "i2", "0")
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
// Don't provision anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Increase minVersion
workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null));
// Provision two new workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScalingStrategy);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h2", "i2", "0")
)
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Terminate old workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn(
ImmutableList.of("h1", "h2", "h3", "h4")
);
EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn(
new AutoScalingData(ImmutableList.of("h1", "h2"))
);
EasyMock.replay(autoScalingStrategy);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(null, "h1", "i1", "0"),
new TestZkWorker(null, "h2", "i2", "0"),
new TestZkWorker(NoopTask.create(), "h3", "i3", "1"),
new TestZkWorker(NoopTask.create(), "h4", "i4", "1")
)
);
Assert.assertTrue(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testNullWorkerSetupData() throws Exception
{
@ -491,7 +412,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
@ -500,7 +421,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
new RemoteTaskRunnerWorkItem(testTask.getId(), null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)

View File

@ -49,6 +49,7 @@ public class TaskAnnouncementTest
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
);
final TaskStatus status = TaskStatus.running(task.getId());

View File

@ -0,0 +1,136 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.worker.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.Response;
/**
*/
public class WorkerResourceTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final String basePath = "/test/druid";
private static final String announcementsPath = String.format("%s/indexer/announcements/host", basePath);
private TestingCluster testingCluster;
private CuratorFramework cf;
private Worker worker;
private WorkerCuratorCoordinator curatorCoordinator;
private WorkerResource workerResource;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
worker = new Worker(
"host",
"ip",
3,
"v1"
);
curatorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return basePath;
}
},
new RemoteTaskRunnerConfig(),
cf,
worker
);
curatorCoordinator.start();
workerResource = new WorkerResource(
worker,
curatorCoordinator,
null
);
}
@After
public void tearDown() throws Exception
{
curatorCoordinator.stop();
cf.close();
testingCluster.close();
}
@Test
public void testDoDisable() throws Exception
{
Worker theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class);
Assert.assertEquals("v1", theWorker.getVersion());
Response res = workerResource.doDisable();
Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class);
Assert.assertTrue(theWorker.getVersion().isEmpty());
}
@Test
public void testDoEnable() throws Exception
{
// Disable the worker
Response res = workerResource.doDisable();
Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
Worker theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class);
Assert.assertTrue(theWorker.getVersion().isEmpty());
// Enable the worker
res = workerResource.doEnable();
Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class);
Assert.assertEquals("v1", theWorker.getVersion());
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

33
pom.xml
View File

@ -23,14 +23,14 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.117-SNAPSHOT</tag>
<tag>druid-0.7.0-SNAPSHOT</tag>
</scm>
<prerequisites>
@ -39,9 +39,9 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.5</metamx.java-util.version>
<apache.curator.version>2.5.0</apache.curator.version>
<druid.api.version>0.2.4</druid.api.version>
<metamx.java-util.version>0.26.6</metamx.java-util.version>
<apache.curator.version>2.6.0</apache.curator.version>
<druid.api.version>0.2.7</druid.api.version>
</properties>
<modules>
@ -74,7 +74,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.2.11</version>
<version>0.2.12</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -199,7 +199,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>17.0</version>
<version>16.0.1</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
@ -282,9 +282,9 @@
<version>1</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.0</version>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.jamesmurty.utils</groupId>
@ -324,17 +324,22 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.1.5.v20140505</version>
<version>9.2.2.v20140723</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.1.5.v20140505</version>
<version>9.2.2.v20140723</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<version>9.1.5.v20140505</version>
<version>9.2.2.v20140723</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
<version>9.2.2.v20140723</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
@ -379,7 +384,7 @@
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.1.2</version>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.129-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
@ -82,6 +82,10 @@
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<!-- Tests -->

View File

@ -33,6 +33,7 @@ import io.druid.guice.JsonConfigProvider;
import io.druid.guice.PropertiesModule;
import io.druid.jackson.JacksonModule;
import java.util.Arrays;
import java.util.List;
/**
@ -44,7 +45,7 @@ public class GuiceInjectors
return Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new PropertiesModule(Arrays.asList("global.runtime.properties", "runtime.properties")),
new ConfigModule(),
new Module()
{
@ -63,7 +64,7 @@ public class GuiceInjectors
List<Module> theModules = Lists.newArrayList();
theModules.add(new DruidGuiceExtensions());
theModules.add(new JacksonModule());
theModules.add(new PropertiesModule("runtime.properties"));
theModules.add(new PropertiesModule(Arrays.asList("global.runtime.properties", "runtime.properties")));
theModules.add(new ConfigModule());
theModules.add(
new Module()

View File

@ -17,10 +17,11 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
package io.druid.guice;;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.guava.CloseQuietly;
@ -33,6 +34,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Properties;
/**
@ -41,11 +43,11 @@ public class PropertiesModule implements Module
{
private static final Logger log = new Logger(PropertiesModule.class);
private final String propertiesFile;
private final List<String> propertiesFiles;
public PropertiesModule(String propertiesFile)
public PropertiesModule(List<String> propertiesFiles)
{
this.propertiesFile = propertiesFile;
this.propertiesFiles = propertiesFiles;
}
@Override
@ -57,30 +59,32 @@ public class PropertiesModule implements Module
Properties props = new Properties(fileProps);
props.putAll(systemProps);
InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
try {
if (stream == null) {
File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
if (workingDirectoryFile.exists()) {
stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
for (String propertiesFile : propertiesFiles) {
InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
try {
if (stream == null) {
File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
if (workingDirectoryFile.exists()) {
stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
}
}
}
if (stream != null) {
log.info("Loading properties from %s", propertiesFile);
try {
fileProps.load(new InputStreamReader(stream, Charsets.UTF_8));
}
catch (IOException e) {
throw Throwables.propagate(e);
if (stream != null) {
log.info("Loading properties from %s", propertiesFile);
try {
fileProps.load(new InputStreamReader(stream, Charsets.UTF_8));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
}
catch (FileNotFoundException e) {
log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
}
finally {
CloseQuietly.close(stream);
catch (FileNotFoundException e) {
log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
}
finally {
CloseQuietly.close(stream);
}
}
binder.bind(Properties.class).toInstance(props);

View File

@ -70,14 +70,14 @@ public abstract class BaseQuery<T> implements Query<T>
}
@Override
public Sequence<T> run(QuerySegmentWalker walker)
public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
{
return run(querySegmentSpec.lookup(this, walker));
return run(querySegmentSpec.lookup(this, walker), context);
}
public Sequence<T> run(QueryRunner<T> runner)
public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context)
{
return runner.run(this);
return runner.run(this, context);
}
@Override
@ -102,6 +102,7 @@ public abstract class BaseQuery<T> implements Query<T>
return duration;
}
@Override
@JsonProperty
public Map<String, Object> getContext()
{

View File

@ -30,6 +30,7 @@ import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
*/
@ -52,11 +53,11 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@Override
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query)
public Sequence<T> run(final Query<T> query, Map<String, Object> context)
{
if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query);
final Sequence<T> baseSequence = base.run(query, context);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(
Arrays.asList(
@ -71,7 +72,6 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
)
);
}
return base.run(query);
return base.run(query, context);
}
}

View File

@ -21,6 +21,8 @@ package io.druid.query;
import com.metamx.common.guava.Sequence;
import java.util.Map;
/**
*/
public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@ -35,14 +37,14 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query)
public Sequence<T> run(Query<T> query, Map<String, Object> context)
{
if (query.getContextBySegment(false)) {
return baseRunner.run(query);
return baseRunner.run(query, context);
}
return doRun(baseRunner, query);
return doRun(baseRunner, query, context);
}
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query);
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context);
}

View File

@ -39,6 +39,7 @@ import com.metamx.common.logger.Logger;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -93,7 +94,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query)
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
final int priority = query.getContextPriority(0);
@ -124,7 +125,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
throw new ISE("Input is null?! How is this possible?!");
}
Sequence<T> result = input.run(query);
Sequence<T> result = input.run(query, context);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}

View File

@ -23,6 +23,9 @@ import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import java.util.List;
import java.util.Map;
/**
*/
public class ConcatQueryRunner<T> implements QueryRunner<T>
@ -36,7 +39,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query)
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
return Sequences.concat(
Sequences.map(
@ -46,7 +49,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> apply(final QueryRunner<T> input)
{
return input.run(query);
return input.run(query, context);
}
}
)

View File

@ -19,10 +19,14 @@
package io.druid.query;
import com.google.common.base.Joiner;
import java.util.List;
public class DataSourceUtil
{
public static final Joiner COMMA_JOIN = Joiner.on(",");
public static String getMetricName(DataSource dataSource)
{
final List<String> names = dataSource.getNames();

View File

@ -35,6 +35,8 @@ import io.druid.query.search.SearchResultValue;
import io.druid.query.search.search.InsensitiveContainsSearchQuerySpec;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.search.search.SearchQuerySpec;
import io.druid.query.select.PagingSpec;
import io.druid.query.select.SelectQuery;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
@ -943,4 +945,158 @@ public class Druids
{
return new SegmentMetadataQueryBuilder();
}
/**
* A Builder for SelectQuery.
* <p/>
* Required: dataSource(), intervals() must be called before build()
* <p/>
* Usage example:
* <pre><code>
* SelectQuery query = new SelectQueryBuilder()
* .dataSource("Example")
* .interval("2010/2013")
* .build();
* </code></pre>
*
* @see io.druid.query.select.SelectQuery
*/
public static class SelectQueryBuilder
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private Map<String, Object> context;
private DimFilter dimFilter;
private QueryGranularity granularity;
private List<String> dimensions;
private List<String> metrics;
private PagingSpec pagingSpec;
public SelectQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
context = null;
dimFilter = null;
granularity = QueryGranularity.ALL;
dimensions = Lists.newArrayList();
metrics = Lists.newArrayList();
pagingSpec = null;
}
public SelectQuery build()
{
return new SelectQuery(
dataSource,
querySegmentSpec,
dimFilter,
granularity,
dimensions,
metrics,
pagingSpec,
context
);
}
public SelectQueryBuilder copy(SelectQueryBuilder builder)
{
return new SelectQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.context(builder.context);
}
public SelectQueryBuilder dataSource(String ds)
{
dataSource = new TableDataSource(ds);
return this;
}
public SelectQueryBuilder dataSource(DataSource ds)
{
dataSource = ds;
return this;
}
public SelectQueryBuilder intervals(QuerySegmentSpec q)
{
querySegmentSpec = q;
return this;
}
public SelectQueryBuilder intervals(String s)
{
querySegmentSpec = new LegacySegmentSpec(s);
return this;
}
public SelectQueryBuilder intervals(List<Interval> l)
{
querySegmentSpec = new LegacySegmentSpec(l);
return this;
}
public SelectQueryBuilder context(Map<String, Object> c)
{
context = c;
return this;
}
public SelectQueryBuilder filters(String dimensionName, String value)
{
dimFilter = new SelectorDimFilter(dimensionName, value);
return this;
}
public SelectQueryBuilder filters(String dimensionName, String value, String... values)
{
List<DimFilter> fields = Lists.<DimFilter>newArrayList(new SelectorDimFilter(dimensionName, value));
for (String val : values) {
fields.add(new SelectorDimFilter(dimensionName, val));
}
dimFilter = new OrDimFilter(fields);
return this;
}
public SelectQueryBuilder filters(DimFilter f)
{
dimFilter = f;
return this;
}
public SelectQueryBuilder granularity(String g)
{
granularity = QueryGranularity.fromString(g);
return this;
}
public SelectQueryBuilder granularity(QueryGranularity g)
{
granularity = g;
return this;
}
public SelectQueryBuilder dimensions(List<String> d)
{
dimensions = d;
return this;
}
public SelectQueryBuilder metrics(List<String> m)
{
metrics = m;
return this;
}
public SelectQueryBuilder pagingSpec(PagingSpec p)
{
pagingSpec = p;
return this;
}
}
public static SelectQueryBuilder newSelectQueryBuilder()
{
return new SelectQueryBuilder();
}
}

View File

@ -28,6 +28,8 @@ import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.MetricManipulatorFns;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
/**
*/
@ -46,7 +48,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query)
public Sequence<T> run(final Query<T> query, Map<String, Object> context)
{
final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true);
@ -94,7 +96,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
return Sequences.map(
baseRunner.run(queryToRun),
baseRunner.run(queryToRun, context),
finalizerFn
);

View File

@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -44,8 +43,8 @@ import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -53,38 +52,24 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class GroupByParallelQueryRunner implements QueryRunner<Row>
public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
private final Iterable<QueryRunner<Row>> queryables;
private final Iterable<QueryRunner<T>> queryables;
private final ListeningExecutorService exec;
private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier;
private final QueryWatcher queryWatcher;
private final StupidPool<ByteBuffer> bufferPool;
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
StupidPool<ByteBuffer> bufferPool,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, queryWatcher, bufferPool, Arrays.asList(queryables));
}
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
StupidPool<ByteBuffer> bufferPool,
Iterable<QueryRunner<Row>> queryables
Iterable<QueryRunner<T>> queryables
)
{
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
@ -92,37 +77,46 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
}
@Override
public Sequence<Row> run(final Query<Row> queryParam)
public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> context)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
configSupplier.get(),
bufferPool
);
final Pair<List, Accumulator<List, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = query.getContextBySegment(false);
final int priority = query.getContextPriority(0);
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>()
new Function<QueryRunner<T>, ListenableFuture<Void>>()
{
@Override
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input)
public ListenableFuture<Void> apply(final QueryRunner<T> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
new AbstractPrioritizedCallable<Void>(priority)
{
@Override
public Boolean call() throws Exception
public Void call() throws Exception
{
try {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
if (bySegment) {
input.run(queryParam, context)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else {
input.run(queryParam, context)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
return null;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
@ -171,13 +165,24 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
throw Throwables.propagate(e.getCause());
}
return new ResourceClosingSequence<Row>(
if (bySegment) {
return Sequences.simple(bySegmentAccumulatorPair.lhs);
}
return new ResourceClosingSequence<T>(
Sequences.simple(
indexAccumulatorPair.lhs
.iterableWithPostAggregations(null)
),
indexAccumulatorPair.lhs
Iterables.transform(
indexAccumulatorPair.lhs.iterableWithPostAggregations(null),
new Function<Row, T>()
{
@Override
public T apply(Row input)
{
return (T) input;
}
}
)
), indexAccumulatorPair.lhs
);
}
}

View File

@ -33,6 +33,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
@ -48,10 +49,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query)
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
if (period.getMillis() == 0) {
return baseRunner.run(query);
return baseRunner.run(query, context);
}
return Sequences.concat(
@ -74,7 +75,8 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
public Sequence<T> apply(Interval singleInterval)
{
return baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval)))
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
context
);
}
}

View File

@ -28,15 +28,20 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*/
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
{
private static final String DEFAULT_METRIC_NAME = "query/time";
private final ServiceEmitter emitter;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryRunner<T> queryRunner;
private final long creationTime;
private final String metricName;
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
@ -44,29 +49,42 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
QueryRunner<T> queryRunner
)
{
this(emitter, builderFn, queryRunner, -1);
this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME);
}
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner,
long creationTime
long creationTime,
String metricName
)
{
this.emitter = emitter;
this.builderFn = builderFn;
this.queryRunner = queryRunner;
this.creationTime = creationTime;
this.metricName = metricName;
}
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner,
String metricName
)
{
this(emitter, builderFn, queryRunner, -1, metricName);
}
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
{
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis());
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis(), metricName);
}
@Override
public Sequence<T> run(final Query<T> query)
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
{
final ServiceMetricEvent.Builder builder = builderFn.apply(query);
String queryId = query.getId();
@ -84,7 +102,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long startTime = System.currentTimeMillis();
try {
retVal = queryRunner.run(query).accumulate(outType, accumulator);
retVal = queryRunner.run(query, context).accumulate(outType, accumulator);
}
catch (RuntimeException e) {
builder.setUser10("failed");
@ -97,9 +115,9 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
finally {
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken));
emitter.emit(builder.build(metricName, timeTaken));
if(creationTime > 0) {
if (creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
}
@ -114,7 +132,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long startTime = System.currentTimeMillis();
try {
retVal = queryRunner.run(query).toYielder(initValue, accumulator);
retVal = queryRunner.run(query, context).toYielder(initValue, accumulator);
}
catch (RuntimeException e) {
builder.setUser10("failed");
@ -173,12 +191,13 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
}
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken));
emitter.emit(builder.build(metricName, timeTaken));
if (creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
} finally {
}
finally {
yielder.close();
}
}

View File

@ -22,12 +22,15 @@ package io.druid.query;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import java.util.List;
import java.util.Map;
/**
*/
public class NoopQueryRunner<T> implements QueryRunner<T>
{
@Override
public Sequence<T> run(Query query)
public Sequence<T> run(Query<T> query, Map<String, Object> context)
{
return Sequences.empty();
}

View File

@ -62,14 +62,16 @@ public interface Query<T>
public String getType();
public Sequence<T> run(QuerySegmentWalker walker);
public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context);
public Sequence<T> run(QueryRunner<T> runner);
public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context);
public List<Interval> getIntervals();
public Duration getDuration();
public Map<String, Object> getContext();
public <ContextType> ContextType getContextValue(String key);
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);

Some files were not shown because too many files have changed in this diff Show More