Merge branch 'master' into select

Conflicts:
	server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
This commit is contained in:
fjy 2014-01-20 13:26:51 -08:00
commit c93f896092
106 changed files with 1934 additions and 685 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
echo "See also http://druid.io/docs/0.6.46"
echo "See also http://druid.io/docs/0.6.52"

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -21,10 +21,15 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*/
@ -49,4 +54,31 @@ public class Execs
{
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
}
/**
* @param nameFormat nameformat for threadFactory
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
{
return new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(capacity), makeThreadFactory(nameFormat)
, new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
((ArrayBlockingQueue) executor.getQueue()).put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
}
}

View File

@ -0,0 +1,92 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.concurrent;
import com.google.common.base.Throwables;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
{
@Test
public void testBlockingExecutorService() throws Exception
{
final int capacity = 3;
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
final CountDownLatch taskStartSignal = new CountDownLatch(1);
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger();
ExecutorService producer = Executors.newSingleThreadExecutor();
producer.submit(
new Runnable()
{
public void run()
{
for (int i = 0; i < 2 * capacity; i++) {
final int taskID = i;
System.out.println("Produced task" + taskID);
blockingExecutor.submit(
new Runnable()
{
@Override
public void run()
{
System.out.println("Starting task" + taskID);
try {
taskStartSignal.await();
consumedCount.incrementAndGet();
taskCompletedSignal.countDown();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
System.out.println("Completed task" + taskID);
}
}
);
producedCount.incrementAndGet();
queueFullSignal.countDown();
}
}
}
);
queueFullSignal.await();
// verify that the producer blocks
Assert.assertEquals(capacity + 1, producedCount.get());
// let the tasks run
taskStartSignal.countDown();
// wait until all tasks complete
taskCompletedSignal.await();
// verify all tasks consumed
Assert.assertEquals(2 * capacity, consumedCount.get());
// cleanup
blockingExecutor.shutdown();
producer.shutdown();
}
}

View File

@ -0,0 +1,4 @@
---
layout: doc_page
---
Experimental features are features we have developed but have not fully tested in a production environment. If you choose to try them out, there will likely to edge cases that we have not covered. We would love feedback on any of these features, whether they are bug reports, suggestions for improvement, or letting us know they work as intended.

View File

@ -1,14 +1,16 @@
---
layout: doc_page
---
There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-service.html) or you can use the `HadoopDruidIndexer`.
# Batch Data Ingestion
There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-Service.html) or you can use the `HadoopDruidIndexer`.
Which should I use?
-------------------
The [Indexing service](Indexing-service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.
The [Indexing service](Indexing-Service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.
The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and dont want to spend the time configuring and deploying the [Indexing service](Indexing service.html) just yet.
The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and dont want to spend the time configuring and deploying the [Indexing service](Indexing-Service.html) just yet.
Batch Ingestion using the HadoopDruidIndexer
--------------------------------------------
@ -25,8 +27,8 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
{
"dataSource": "the_data_source",
"timestampSpec" : {
"timestampColumn": "ts",
"timestampFormat": "<iso, millis, posix, auto or any Joda time format>"
"column": "ts",
"format": "<iso, millis, posix, auto or any Joda time format>"
},
"dataSpec": {
"format": "<csv, tsv, or json>",
@ -186,8 +188,8 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
"config": {
"dataSource" : "example",
"timestampSpec" : {
"timestampColumn" : "timestamp",
"timestampFormat" : "auto"
"column" : "timestamp",
"format" : "auto"
},
"dataSpec" : {
"format" : "json",
@ -229,7 +231,7 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
|--------|-----------|---------|
|type|This should be "index_hadoop".|yes|
|config|A Hadoop Index Config (see above).|yes|
|hadoopCoordinates|The Maven <groupId>:<artifactId>:<version> of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
|hadoopCoordinates|The Maven `<groupId>:<artifactId>:<version>` of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.

View File

@ -1,30 +1,80 @@
---
layout: doc_page
---
# Booting a Single Node Cluster #
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.46-bin.tar.gz).
# Booting a Druid Cluster
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. However, when it's time to run a more realistic setup&mdash;for production or just for testing production&mdash;you'll want to find a way to start the cluster on multiple hosts. This document describes two different ways to do this: manually, or as a cloud service via Apache Whirr.
The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables:
## Manually Booting a Druid Cluster
You can provision individual servers, loading Druid onto each machine (or building it) and setting the required configuration for each type of node. You'll also have to set up required external dependencies. Then you'll have to start each node. This process is outlined in [Tutorial: The Druid Cluster](Tutorial:-The-Druid-Cluster.html).
## Apache Whirr
[Apache Whirr](http://whirr.apache.org/) is a set of libraries for launching cloud services. For Druid, Whirr serves as an easy way to launch a cluster in Amazon AWS by using simple commands and configuration files (called *recipes*).
**NOTE:** Whirr will install Druid 0.5.x. At this time Whirr can launch Druid 0.5.x only, but in the near future will support Druid 0.6.x. You can download and install your own copy of Druid 0.5.x [here](http://static.druid.io/artifacts/releases/druid-services-0.5.7-bin.tar.gz).
You'll need an AWS account, and an EC2 key pair from that account so that Whirr can connect to the cloud via the EC2 API. If you haven't generated a key pair, see the [AWS documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) or see this [Whirr FAQ](http://whirr.apache.org/faq.html#how-do-i-find-my-cloud-credentials).
### Installing Whirr
You must use a version of Whirr that includes and supports a Druid recipe. You can do it so in one of two ways:
#### Build the Following Version of Whirr
Clone the code from [https://github.com/rjurney/whirr/tree/trunk](https://github.com/rjurney/whirr/tree/trunk) and build Whirr:
git clone git@github.com:rjurney/whirr.git
cd whirr
git checkout trunk
mvn clean install -Dmaven.test.failure.ignore=true
#### Build the Latest Version of Whirr
Clone the code from the Whirr repository:
git clone git://git.apache.org/whirr.git
Then run `mvn install` from the root directory.
### Configure Whirr
The Whirr recipe for Druid is the configuration file `$WHIRR_HOME/recipies/druid.properties`. You can edit this file to suit your needs -- it is annotated and self-explanatory. Here are some hints about that file:
* Set `whirr.location-id` to a specific AWS region (e.g., us-east-1) if desired, else one will be chosen for you.
* You can choose the hardware used with `whirr.hardware-id` to a specific AWS region (e.g., m1.large). If you don't choose an image via `whirr.image-id` (image must be compatible with hardware), you'll get plain vanilla Linux.
* SSH keys (not password protected) must exist for the local user. If they are in the default locations, `${sys:user.home}/.ssh/id_rsa` and `${sys:user.home}/.ssh/id_rsa.pub`, Whirr will find them. Otherwise, you'll have to specify them with `whirr.private-key-file` and `whirr.public-key-file`.
* Be sure to specify the absolute path of the Druid realtime spec file `realtime.spec` in `whirr.druid.realtime.spec.path`.
* Two Druid cluster templates (see `whirr.instance-templates`) are provided: a small cluster running on a single EC2 instance, and a larger cluster running on multiple instances. The first is a good test case to start with.
The following AWS information must be set in `druid.properties`, as environment variables, or in the file `$WHIRR_HOME/conf/credentials`:
PROVIDER=aws-ec2
IDENTITY=<aws-id-key>
CREDENTIAL=<aws-private-key>
How to get the IDENTITY and CREDENTIAL keys is discussed above.
### Start a Test Cluster With Whirr
Run the following command:
```bash
# Setup environment for ec2-api-tools
export EC2_HOME=/path/to/ec2-api-tools-1.6.7.4/
export PATH=$PATH:$EC2_HOME/bin
export AWS_ACCESS_KEY=
export AWS_SECRET_KEY=
% $WHIRR_HOME/bin/whirr launch-cluster --config $WHIRR_HOME/recipes/druid.properties
```
If Whirr starts without an errors, you should see the following message:
Then, booting an ec2 instance running one node of each type is as simple as running the script, run_ec2.sh :)
Running on provider aws-ec2 using identity <your-aws-id-here>
You can then use the EC2 dashboard to locate the instance and confirm that it has started up.
# Apache Whirr #
If both the instance and the Druid cluster launch successfully, a few minutes later other messages to STDOUT should follow with information returned from EC2, including the instance ID:
Apache Whirr is a set of libraries for launching cloud services. You can clone a version of Whirr that includes Druid as a service from git@github.com:rjurney/whirr.git:
Started cluster of 1 instances
Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-master, druid-broker, druid-compute, druid-realtime], publicIp= ...
The final message will contain login information for the instance.
Note that the Whirr will return an exception if any of the nodes fail to launch, and the cluster will be destroyed. To destroy the cluster manually, run the following command:
```bash
git clone git@github.com:rjurney/whirr.git
cd whirr
git checkout trunk
mvn clean install -Dmaven.test.failure.ignore=true -Dcheckstyle.skip
sp;bin/whirr launch-cluster --config recipes/druid.properties
% $WHIRR_HOME/bin/whirr destroy-cluster --config $WHIRR_HOME/recipes/druid.properties
```

View File

@ -35,9 +35,10 @@ JVM Configuration
The broker module uses several of the default modules in [Configuration](Configuration.html) and has the following set of configurations as well:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.broker.cache.type`|Choices: local, memcache. The type of cache to use for queries.|local|
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to compute nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`|
#### Local Cache
@ -76,4 +77,4 @@ Caching
-------
Broker nodes employ a cache with a LRU cache invalidation strategy. The broker cache stores per segment results. The cache can be local to each broker node or shared across multiple nodes using an external distributed cache such as [memcached](http://memcached.org/). Each time a broker node receives a query, it first maps the query to a set of segments. A subset of these segment results may already exist in the cache and the results can be directly pulled from the cache. For any segment results that do not exist in the cache, the broker node will forward the query to the
historical nodes. Once the historical nodes return their results, the broker will store those results in the cache. Real-time segments are never cached and hence requests for real-time data will always be forwarded to real-time nodes. Real-time data is perpetually changing and caching the results would be unreliable.
historical nodes. Once the historical nodes return their results, the broker will store those results in the cache. Real-time segments are never cached and hence requests for real-time data will always be forwarded to real-time nodes. Real-time data is perpetually changing and caching the results would be unreliable.

View File

@ -8,34 +8,38 @@ The following definitions are given with respect to the Druid data store. They a
More definitions are available on the [design page](Design.html).
* **Aggregation** The summarizing of data meeting certain specifications. Druid aggregates [timeseries data](#timeseries), which in effect compacts the data. Time intervals (set in configuration) are used to create buckets, while [timestamps](#timestamp) determine which buckets data aggregated in.
* **Aggregation**&nbsp;&nbsp;The summarizing of data meeting certain specifications. Druid aggregates [timeseries data](#timeseries), which in effect compacts the data. Time intervals (set in configuration) are used to create buckets, while [timestamps](#timestamp) determine which buckets data aggregated in.
* **Aggregators** A mechanism for combining records during realtime incremental indexing, Hadoop batch indexing, and in queries.
* **Aggregators**&nbsp;&nbsp;A mechanism for combining records during realtime incremental indexing, Hadoop batch indexing, and in queries.
* **DataSource** A table-like view of data; specified in [specFiles](#specfile) and in queries. A dataSource specifies the source of data being ingested and ultimately stored in [segments](#segment).
* **Compute node**&nbsp;&nbsp;Obsolete name for a [Historical node](Historical.html).
* **Dimensions** Aspects or categories of data, such as languages or locations. For example, with *language* and *country* as the type of dimension, values could be "English" or "Mandarin" for language, or "USA" or "China" for country. In Druid, dimensions can serve as filters for narrowing down hits (for example, language = "English" or country = "China").
* **DataSource**&nbsp;&nbsp;A table-like view of data; specified in [specFiles](#specfile) and in queries. A dataSource specifies the source of data being ingested and ultimately stored in [segments](#segment).
* **Ephemeral Node** A Zookeeper node (or "znode") that exists for as long as the session that created the znode is active. More info [here](http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes). In a Druid cluster, ephemeral nodes are typically used for commands (such as assigning [segments](#segment) to certain nodes).
* **Dimensions**&nbsp;&nbsp;Aspects or categories of data, such as languages or locations. For example, with *language* and *country* as the type of dimension, values could be "English" or "Mandarin" for language, or "USA" or "China" for country. In Druid, dimensions can serve as filters for narrowing down hits (for example, language = "English" or country = "China").
* **Granularity** The time interval corresponding to aggregation by time. Druid configuration settings specify the granularity of [timestamp](#timestamp) buckets in a [segment](#segment) (for example, by minute or by hour), as well as the granularity of the segment itself. The latter is essentially the overall range of absolute time covered by the segment. In queries, granularity settings control the summarization of findings.
* **Ephemeral Node**&nbsp;&nbsp;A Zookeeper node (or "znode") that exists for as long as the session that created the znode is active. More info [here](http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes). In a Druid cluster, ephemeral nodes are typically used for commands (such as assigning [segments](#segment) to certain nodes).
* **Ingestion** The pulling and initial storing and processing of data. Druid supports realtime and batch ingestion of data, and applies indexing in both cases.
* **Granularity**&nbsp;&nbsp;The time interval corresponding to aggregation by time. Druid configuration settings specify the granularity of [timestamp](#timestamp) buckets in a [segment](#segment) (for example, by minute or by hour), as well as the granularity of the segment itself. The latter is essentially the overall range of absolute time covered by the segment. In queries, granularity settings control the summarization of findings.
* **Metrics** Countable data that can be aggregated. Metrics, for example, can be the number of visitors to a website, number of tweets per day, or average revenue.
* **Ingestion**&nbsp;&nbsp;The pulling and initial storing and processing of data. Druid supports realtime and batch ingestion of data, and applies indexing in both cases.
* **Rollup** The aggregation of data that occurs at one or more stages, based on settings in a [configuration file](#specFile).
* **Master node**&nbsp;&nbsp;Obsolete name for a [Coordinator node](Coordinator.html).
* **Metrics**&nbsp;&nbsp;Countable data that can be aggregated. Metrics, for example, can be the number of visitors to a website, number of tweets per day, or average revenue.
* **Rollup**&nbsp;&nbsp;The aggregation of data that occurs at one or more stages, based on settings in a [configuration file](#specFile).
<a name="segment"></a>
* **Segment** A collection of (internal) records that are stored and processed together. Druid chunks data into segments representing a time interval, and these are stored and manipulated in the cluster.
* **Segment**&nbsp;&nbsp;A collection of (internal) records that are stored and processed together. Druid chunks data into segments representing a time interval, and these are stored and manipulated in the cluster.
* **Shard** A sub-partition of the data, allowing multiple [segments](#segment) to represent the data in a certain time interval. Sharding occurs along time partitions to better handle amounts of data that exceed certain limits on segment size, although sharding along dimensions may also occur to optimize efficiency.
* **Shard**&nbsp;&nbsp;A sub-partition of the data, allowing multiple [segments](#segment) to represent the data in a certain time interval. Sharding occurs along time partitions to better handle amounts of data that exceed certain limits on segment size, although sharding along dimensions may also occur to optimize efficiency.
<a name="specfile"></a>
* **specFile** The specification for services in JSON format; see [Realtime](Realtime.html) and [Batch-ingestion](Batch-ingestion.html)
* **specFile**&nbsp;&nbsp;The specification for services in JSON format; see [Realtime](Realtime.html) and [Batch-ingestion](Batch-ingestion.html)
<a name="timeseries"></a>
* **Timeseries Data** Data points which are ordered in time. The closing value of a financial index or the number of tweets per hour with a certain hashtag are examples of timeseries data.
* **Timeseries Data**&nbsp;&nbsp;Data points which are ordered in time. The closing value of a financial index or the number of tweets per hour with a certain hashtag are examples of timeseries data.
<a name="timestamp"></a>
* **Timestamp** An absolute position on a timeline, given in a standard alpha-numerical format such as with UTC time. [Timeseries data](#timeseries) points can be ordered by timestamp, and in Druid, they are.
* **Timestamp**&nbsp;&nbsp;An absolute position on a timeline, given in a standard alpha-numerical format such as with UTC time. [Timeseries data](#timeseries) points can be ordered by timestamp, and in Druid, they are.

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.46
git checkout druid-0.6.52
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -1,17 +1,19 @@
---
layout: doc_page
---
# Druid Firehoses
Firehoses describe the data stream source. They are pluggable and thus the configuration schema can and will vary based on the `type` of the firehose.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|Specifies the type of firehose. Each value will have its own configuration schema, firehoses packaged with Druid are described [here](https://github.com/metamx/druid/wiki/Firehose#available-firehoses)|yes|
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | Specifies the type of firehose. Each value will have its own configuration schema, firehoses packaged with Druid are described below. | yes |
We describe the configuration of the Kafka firehose from the example below, but check [here](https://github.com/metamx/druid/wiki/Firehose#available-firehoses) for more information about the various firehoses that are available in Druid.
We describe the configuration of the [Kafka firehose example](Realtime-ingestion.html#realtime-specfile), but there are other types available in Druid (see below).
- `consumerProps` is a map of properties for the Kafka consumer. The JSON object is converted into a Properties object and passed along to the Kafka consumer.
- `feed` is the feed that the Kafka consumer should read from.
- `parser` represents a parser that knows how to convert from String representations into the required `InputRow` representation that Druid uses. This is a potentially reusable piece that can be found in many of the firehoses that are based on text streams. The spec in the example describes a JSON feed (new-line delimited objects), with a timestamp column called "timestamp" in ISO8601 format and that it should not include the dimension "value" when processing. More information about the options available for the parser are available [here](https://github.com/metamx/druid/wiki/Firehose#parsing-data).
- `parser` represents a parser that knows how to convert from String representations into the required `InputRow` representation that Druid uses. This is a potentially reusable piece that can be found in many of the firehoses that are based on text streams. The spec in the example describes a JSON feed (new-line delimited objects), with a timestamp column called "timestamp" in ISO8601 format and that it should not include the dimension "value" when processing. More information about the options available for the parser are available below.
Available Firehoses
-------------------

View File

@ -0,0 +1,59 @@
---
layout: doc_page
---
Druid supports filtering specially spatially indexed columns based on an origin and a bound.
# Spatial Indexing
In any of the data specs, there is the option of providing spatial dimensions. For example, for a JSON data spec, spatial dimensions can be specified as follows:
```json
"dataSpec" : {
"format": "JSON",
"dimensions": <some_dims>,
"spatialDimensions": [
{
"dimName": "coordinates",
"dims": ["lat", "long"]
},
...
]
}
```
|property|description|required?|
|--------|-----------|---------|
|dimName|The name of the spatial dimension. A spatial dimension may be constructed from multiple other dimensions or it may already exist as part of an event. If a spatial dimension already exists, it must be an array of coordinate values.|yes|
|dims|A list of dimension names that comprise a spatial dimension.|no|
# Spatial Filters
The grammar for a spatial filter is as follows:
```json
"filter" : {
"type": "spatial",
"dimension": "spatialDim",
"bound": {
"type": "rectangular",
"minCoords": [10.0, 20.0],
"maxCoords": [30.0, 40.0]
}
}
```
Bounds
------
### Rectangular
|property|description|required?|
|--------|-----------|---------|
|minCoords|List of minimum dimension coordinates for coordinates [x, y, z, …]|yes|
|maxCoords|List of maximum dimension coordinates for coordinates [x, y, z, …]|yes|
### Radius
|property|description|required?|
|--------|-----------|---------|
|coords|Origin coordinates in the form [x, y, z, …]|yes|
|radius|The float radius value|yes|

View File

@ -1,8 +1,7 @@
---
layout: doc_page
---
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query.
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. Note: If you only want to do straight aggreagates for some time range, we highly recommend using [TimeseriesQueries](TimeseriesQuery.html) instead. The performance will be substantially better.
An example groupBy query object is shown below:
``` json

View File

@ -28,11 +28,13 @@ druid.port=8081
druid.zk.service.host=localhost
druid.server.maxSize=100000000
druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=10000000
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]```
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
```
Note: This will spin up a Historical node with the local filesystem as deep storage.

View File

@ -0,0 +1,38 @@
---
layout: doc_page
---
## Where do my Druid segments end up after ingestion?
Depending on what `druid.storage.type` is set to, Druid will upload segments to some [Deep Storage](Deep-Storage.html). Local disk is used as the default deep storage.
## My realtime node is not handing segments off
Make sure that the `druid.publish.type` on your real-time nodes is set to "db". Also make sure that `druid.storage.type` is set to a deep storage that makes sense. Some example configs:
```
druid.publish.type=db
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.storage.type=s3
druid.storage.bucket=druid
druid.storage.baseKey=sample
```
## I don't see my Druid segments on my historical nodes
You can check the coordinator console located at <COORDINATOR_IP>:<PORT>/cluster.html. Make sure that your segments have actually loaded on [historical nodes](Historical.html). If your segments are not present, check the coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because historical nodes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example):
```
-Ddruid.segmentCache.locations=[{"path":"/tmp/druid/storageLocation","maxSize":"500000000000"}]
-Ddruid.server.maxSize=500000000000
```
## My queries are returning empty results
You can check <BROKER_IP>:<PORT>/druid/v2/datasources/<YOUR_DATASOURCE> for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists.
## More information
Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -1,7 +1,7 @@
---
layout: doc_page
---
The orderBy field provides the functionality to sort and limit the set of results from a groupBy query. Available options are:
The orderBy field provides the functionality to sort and limit the set of results from a groupBy query. If you group by a single dimension and are ordering by a single metric, we highly recommend using [TopN Queries](TopNQuery.html) instead. The performance will be substantially better. Available options are:
### DefaultLimitSpec

View File

@ -0,0 +1,18 @@
---
layout: doc_page
---
## What should I set my JVM heap?
The size of the JVM heap really depends on the type of Druid node you are running. Below are a few considerations.
[Broker nodes](Broker.html) can use the JVM heap as a query cache and thus, the size of the heap will affect on the number of results that can be cached. Broker nodes do not require off-heap memory and generally, heap sizes can be set to be close to the maximum memory on the machine (leaving some room for JVM overhead). The heap is used to merge results from different real-time and historical nodes, along with other computational processing.
[Historical nodes](Historical.html) use off-heap memory to store intermediate results, and by default, all segments are memory mapped before they can be queried. The more off-heap memory is available, the more segments can be served without the possibility of data being paged onto disk. On historicals, the JVM heap is used for [GroupBy queries](GroupByQuery.html), some data structures used for intermediate computation, and general processing.
[Coordinator nodes](Coordinator nodes) do not require off-heap memory and the heap is used for loading information about all segments to determine what segments need to be loaded, dropped, moved, or replicated.
## What is the intermediate computation buffer?
The intermediate computation buffer specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed. The default size is 1073741824 bytes (1GB).
## What is server maxSize?
Server maxSize sets the maximum cumulative segment size (in bytes) that a node can hold. Changing this parameter will affect performance by controlling the memory/disk ratio on a node. Setting this parameter to a value greater than the total memory capacity on a node and may cause disk paging to occur. This paging time introduces a query latency delay.

View File

@ -1,11 +1,13 @@
---
layout: doc_page
---
# Druid Plumbers
The Plumber is the thing that handles generated segments both while they are being generated and when they are "done". This is also technically a pluggable interface and there are multiple implementations, but there are a lot of details handled by the plumber such that it is expected that there will only be a few implementations and only more advanced third-parties will implement their own.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|Specifies the type of plumber. Each value will have its own configuration schema, plumbers packaged with Druid are described [here](https://github.com/metamx/druid/wiki/Plumber#available-plumbers)|yes|
|type|String|Specifies the type of plumber. Each value will have its own configuration schema, plumbers packaged with Druid are described below.|yes|
We provide a brief description of the example to exemplify the types of things that are configured on the plumber.

View File

@ -0,0 +1,149 @@
---
layout: doc_page
---
Realtime Data Ingestion
========
Realtime data ingestion uses [Realtime nodes](Realtime.html) to index data and make it immediately available for querying. This data is periodically handed off (in the form of data segments) to [Historical](Historical.html) nodes, after which that data is forgotten by the Realtime nodes. This handoff, or "segment propagation," involves a series of interactions between various members of the Druid cluster. It is illustrated below.
Much of the configuration governing Realtime nodes and the ingestion of data is set in the Realtime spec file, discussed on this page.
Segment Propagation
-------------------
The segment propagation diagram for real-time data ingestion can be seen below:
![Segment Propagation](../img/segmentPropagation.png "Segment Propagation")
You can read about the various components shown in this diagram under the Architecture section (see the menu on the left).
<a id="realtime-specfile"></a>
## Realtime "specFile"
The property `druid.realtime.specFile` has the path of a file (absolute or relative path and file name) with realtime specifications in it. This "specFile" should be a JSON Array of JSON objects like the following:
```json
[
{
"schema": {
"dataSource": "dataSourceName",
"aggregators": [
{
"type": "count",
"name": "events"
},
{
"type": "doubleSum",
"name": "outColumn",
"fieldName": "inColumn"
}
],
"indexGranularity": "minute",
"shardSpec": {
"type": "none"
}
},
"config": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "zk_connect_string",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "consumer-group",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "your_kafka_topic",
"parser": {
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"data": {
"format": "json"
},
"dimensionExclusions": [
"value"
]
}
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist"
}
}
]
```
This is a JSON Array so you can give more than one realtime stream to a given node. The number you can put in the same process depends on the exact configuration. In general, it is best to think of each realtime stream handler as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread for incremental persists and other background tasks.
There are four parts to a realtime stream specification, `schema`, `config`, `firehose` and `plumber` which we will go into here.
### Schema
This describes the data schema for the output Druid segment. More information about concepts in Druid and querying can be found at [Concepts-and-Terminology](Concepts-and-Terminology.html) and [Querying](Querying.html).
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes|
|dataSource|String|The name of the dataSource that the segment belongs to.|yes|
|indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes|
|segmentGranularity|String|The granularity of the segment as a whole. This is generally larger than the index granularity and describes the rate at which the realtime server will push segments out for historical servers to take over.|yes|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no|
### Config
This provides configuration for the data processing portion of the realtime stream processor.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|yes|
|maxRowsInMemory|Number|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|yes|
### Firehose
See [Firehose](Firehose.html).
### Plumber
See [Plumber](Plumber.html)
Constraints
-----------
The following table summarizes constraints between settings in the spec file for the Realtime subsystem.
|Name|Effect|Minimum|Recommended|
|----|------|-------|-----------|
| windowPeriod| when reading an InputRow, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers |
| segmentGranularity| time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than indexGranularity|
| indexGranularity| time granularity (minute, hour, day, week, month) of indexes | less than segmentGranularity| minute, hour, day, week, month |
| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory |
| maxRowsInMemory| the max number of InputRows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
The normal, expected use cases have the following overall constraints: `indexGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity`
If the RealtimeNode process runs out of heap, try adjusting druid.computation.buffer.size property which specifies a size in bytes that must fit into the heap.
Extending the code
------------------
Realtime integration is intended to be extended in two ways:
1. Connect to data streams from varied systems ([Firehose](https://github.com/druid-io/druid-api/blob/master/src/main/java/io/druid/data/input/FirehoseFactory.java))
2. Adjust the publishing strategy to match your needs ([Plumber](https://github.com/metamx/druid/blob/master/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java))
The expectations are that the former will be very common and something that users of Druid will do on a fairly regular basis. Most users will probably never have to deal with the latter form of customization. Indeed, we hope that all potential use cases can be packaged up as part of Druid proper without requiring proprietary customization.
Given those expectations, adding a firehose is straightforward and completely encapsulated inside of the interface. Adding a plumber is more involved and requires understanding of how the system works to get right, its not impossible, but its not intended that individuals new to Druid will be able to do it immediately.

View File

@ -1,7 +1,7 @@
---
layout: doc_page
---
Realtime
Realtime Nodes
========
Realtime nodes provide a realtime index. Data indexed via these nodes is immediately available for querying. Realtime nodes will periodically build segments representing the data theyve collected over some span of time and transfer these segments off to [Historical](Historical.html) nodes. They use ZooKeeper to monitor the transfer and MySQL to store metadata about the transfered segment. Once transfered, segments are forgotten by the Realtime nodes.
@ -27,172 +27,28 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.52"]
druid.zk.service.host=localhost
# The realtime config file.
druid.realtime.specFile=/path/to/specFile
# Choices: db (hand off segments), noop (do not hand off segments).
druid.publish.type=db
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=10000000
druid.processing.buffer.sizeBytes=100000000
```
Note: This setup will not hand off segments to the rest of the cluster.
The realtime module also uses several of the default modules in [Configuration](Configuration.html). For more information on the realtime spec file (or configuration file), see [realtime ingestion](Realtime-ingestion.html) page.
JVM Configuration
-----------------
The realtime module uses several of the default modules in [Configuration](Configuration.html) and has the following set of configurations as well:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.realtime.specFile`|The file with realtime specifications in it.|none|
|`druid.publish.type`|Choices:noop, db. After a real-time node completes building a segment after the window period, what does it do with it? For true handoff to occur, this should be set to "db".|db|
### Realtime "specFile"
The property `druid.realtime.specFile` has the path of a file (absolute or relative path and file name) with realtime specifications in it. This "specFile" should be a JSON Array of JSON objects like the following:
```json
[
{
"schema": {
"dataSource": "dataSourceName",
"aggregators": [
{
"type": "count",
"name": "events"
},
{
"type": "doubleSum",
"name": "outColumn",
"fieldName": "inColumn"
}
],
"indexGranularity": "minute",
"shardSpec": {
"type": "none"
}
},
"config": {
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m"
},
"firehose": {
"type": "kafka-0.7.2",
"consumerProps": {
"zk.connect": "zk_connect_string",
"zk.connectiontimeout.ms": "15000",
"zk.sessiontimeout.ms": "15000",
"zk.synctime.ms": "5000",
"groupid": "consumer-group",
"fetch.size": "1048586",
"autooffset.reset": "largest",
"autocommit.enable": "false"
},
"feed": "your_kafka_topic",
"parser": {
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"data": {
"format": "json"
},
"dimensionExclusions": [
"value"
]
}
},
"plumber": {
"type": "realtime",
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist"
}
}
]
```
This is a JSON Array so you can give more than one realtime stream to a given node. The number you can put in the same process depends on the exact configuration. In general, it is best to think of each realtime stream handler as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread for incremental persists and other background tasks.
There are four parts to a realtime stream specification, `schema`, `config`, `firehose` and `plumber` which we will go into here.
#### Schema
This describes the data schema for the output Druid segment. More information about concepts in Druid and querying can be found at [Concepts-and-Terminology](Concepts-and-Terminology.html) and [Querying](Querying.html).
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes|
|dataSource|String|The name of the dataSource that the segment belongs to.|yes|
|indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes|
|segmentGranularity|String|The granularity of the segment as a whole. This is generally larger than the index granularity and describes the rate at which the realtime server will push segments out for historical servers to take over.|yes|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no|
### Config
This provides configuration for the data processing portion of the realtime stream processor.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|yes|
|maxRowsInMemory|Number|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|yes|
### Firehose
See [Firehose](Firehose.html).
### Plumber
See [Plumber](Plumber.html)
Constraints
-----------
The following tables summarizes constraints between settings in the spec file for the Realtime subsystem.
|Name|Effect|Minimum|Recommended|
|----|------|-------|-----------|
| windowPeriod| when reading an InputRow, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers |
| segmentGranularity| time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than indexGranularity|
| indexGranularity| time granularity (minute, hour, day, week, month) of indexes | less than segmentGranularity| minute, hour, day, week, month |
| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory |
| maxRowsInMemory| the max number of InputRows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
The normal, expected use cases have the following overall constraints: `indexGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity`
If the RealtimeNode process runs out of heap, try adjusting druid.computation.buffer.size property which specifies a size in bytes that must fit into the heap.
Running
-------
```
io.druid.cli.Main server realtime
```
Segment Propagation
-------------------
The segment propagation diagram for real-time data ingestion can be seen below:
![Segment Propagation](../img/segmentPropagation.png "Segment Propagation")
Requirements
------------
Realtime nodes currently require a Kafka cluster to sit in front of them and collect results. Theres more configuration required for these as well.
Extending the code
------------------
Realtime integration is intended to be extended in two ways:
1. Connect to data streams from varied systems ([Firehose](https://github.com/druid-io/druid-api/blob/master/src/main/java/io/druid/data/input/FirehoseFactory.java))
2. Adjust the publishing strategy to match your needs ([Plumber](https://github.com/metamx/druid/blob/master/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java))
The expectations are that the former will be very common and something that users of Druid will do on a fairly regular basis. Most users will probably never have to deal with the latter form of customization. Indeed, we hope that all potential use cases can be packaged up as part of Druid proper without requiring proprietary customization.
Given those expectations, adding a firehose is straightforward and completely encapsulated inside of the interface. Adding a plumber is more involved and requires understanding of how the system works to get right, its not impossible, but its not intended that individuals new to Druid will be able to do it immediately.
Realtime nodes currently require a Kafka cluster to sit in front of them and collect results. Theres [more configuration](Tutorial\:-Loading-Your-Data-Part-2.md#set-up-kafka) required for these as well.

View File

@ -1,35 +0,0 @@
---
layout: doc_page
---
Note: This feature is highly experimental and only works with spatially indexed dimensions.
The grammar for a spatial filter is as follows:
<code>
{
"dimension": "spatialDim",
"bound": {
"type": "rectangular",
"minCoords": [10.0, 20.0],
"maxCoords": [30.0, 40.0]
}
}
</code>
Bounds
------
### Rectangular
|property|description|required?|
|--------|-----------|---------|
|minCoords|List of minimum dimension coordinates for coordinates [x, y, z, …]|yes|
|maxCoords|List of maximum dimension coordinates for coordinates [x, y, z, …]|yes|
### Radius
|property|description|required?|
|--------|-----------|---------|
|coords|Origin coordinates in the form [x, y, z, …]|yes|
|radius|The float radius value|yes|

View File

@ -1,26 +0,0 @@
---
layout: doc_page
---
Note: This feature is highly experimental.
In any of the data specs, there is now the option of providing spatial dimensions. For example, for a JSON data spec, spatial dimensions can be specified as follows:
<code>
{
"type": "JSON",
"dimensions": <some_dims>,
"spatialDimensions": [
{
"dimName": "coordinates",
"dims": ["lat", "long"]
},
...
]
}
</code>
|property|description|required?|
|--------|-----------|---------|
|dimName|The name of the spatial dimension. A spatial dimension may be constructed from multiple other dimensions or it may already exist as part of an event. If a spatial dimension already exists, it must be an array of dimension values.|yes|
|dims|A list of dimension names that comprise a spatial dimension.|no|

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.46
cd druid-services-0.6.52
```
You should see a bunch of files:

View File

@ -194,6 +194,12 @@ Which gets us metrics about only those edits where the namespace is 'article':
Check out [Filters](Filters.html) for more information.
What Types of Queries to Use
----------------------------
The types of query you should use depends on your use case. [TimeBoundary queries](TimeBoundaryQuery.html) are useful to understand the range of your data. [Timeseries queries](TimeseriesQuery.html) are useful for aggregates and filters over a time range, and offer significant speed improvements over [GroupBy queries](GroupByQuery.html). To find the top values for a given dimension, [TopN queries](TopNQuery.html) should be used over group by queries as well.
## Learn More ##
You can learn more about querying at [Querying](Querying.html)! If you are ready to evaluate Druid more in depth, check out [Booting a production cluster](Booting-a-production-cluster.html)!

View File

@ -269,6 +269,8 @@ Next Steps
This tutorial covered ingesting a small batch data set and loading it into Druid. In [Loading Your Data Part 2](Tutorial%3A-Loading-Your-Data-Part-2.html), we will cover how to ingest data using Hadoop for larger data sets.
Note: The index task and local firehose can be used to ingest your own data if the size of that data is relatively small (< 1G). The index task is fairly slow and we highly recommend using the Hadoop Index Task for ingesting larger quantities of data.
Additional Information
----------------------

View File

@ -42,9 +42,10 @@ Streaming Event Ingestion
With real-world data, we recommend having a message bus such as [Apache Kafka](http://kafka.apache.org/) sit between the data stream and the real-time node. The message bus provides higher availability for production environments. [Firehoses](Firehose.html) are the key abstraction for real-time ingestion.
<a id="set-up-kafka"></a>
#### Setting up Kafka
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.46/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.52/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).
@ -91,17 +92,17 @@ You should be comfortable starting Druid nodes at this point. If not, it may be
1. Real-time nodes can be started with:
```bash
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath lib/*:config/realtime io.druid.cli.Main server realtime
```
```bash
java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath lib/*:config/realtime io.druid.cli.Main server realtime
```
2. A realtime.spec should already exist for the data source in the Druid tarball. You should be able to find it at:
```bash
examples/indexing/wikipedia.spec
```
```bash
examples/indexing/wikipedia.spec
```
The contents of the file should match:
The contents of the file should match:
```json
[
@ -263,8 +264,10 @@ Examining the contents of the file, you should find:
"type" : "index_hadoop",
"config": {
"dataSource" : "wikipedia",
"timestampColumn" : "timestamp",
"timestampFormat" : "auto",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dataSpec" : {
"format" : "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
@ -302,7 +305,8 @@ Examining the contents of the file, you should find:
}
```
If you are curious about what all this configuration means, see [here](Task.html)
If you are curious about what all this configuration means, see [here](Task.html).
To submit the task:
```bash

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz)
and untar the contents within by issuing:
@ -149,17 +149,19 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.52"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.server.maxSize=100000000
druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=10000000
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
```
To start the historical node:
@ -238,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.52","io.druid.extensions:druid-kafka-seven:0.6.52"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@ -248,7 +250,7 @@ druid.publish.type=noop
# druid.db.connector.user=druid
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=10000000
druid.processing.buffer.sizeBytes=100000000
```
Next Steps

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.46
cd druid-services-0.6.52
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz.
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -19,12 +19,16 @@ h2. Operations
* "Extending Druid":./Modules.html
* "Cluster Setup":./Cluster-setup.html
* "Booting a Production Cluster":./Booting-a-production-cluster.html
* "Performance FAQ":./Performance-FAQ.html
h2. Data Ingestion
* "Realtime":./Realtime.html
* "Realtime":./Realtime-ingestion.html
** "Firehose":./Firehose.html
** "Plumber":./Plumber.html
* "Batch":./Batch-ingestion.html
* "Indexing Service":./Indexing-Service.html
** "Tasks":./Tasks.html
* "Ingestion FAQ":./Ingestion-FAQ.html
h2. Querying
* "Querying":./Querying.html
@ -54,8 +58,6 @@ h2. Architecture
** "Coordinator":./Coordinator.html
*** "Rule Configuration":./Rule-Configuration.html
** "Realtime":./Realtime.html
*** "Firehose":./Firehose.html
*** "Plumber":./Plumber.html
** "Indexing Service":./Indexing-Service.html
*** "Middle Manager":./Middlemanager.html
*** "Peon":./Peons.html
@ -64,6 +66,10 @@ h2. Architecture
** "MySQL":./MySQL.html
** "ZooKeeper":./ZooKeeper.html
h2. Experimental
* "About Experimental Features":./About-Experimental-Features.html
* "Geographic Queries":./GeographicQueries.html
h2. Development
* "Versioning":./Versioning.html
* "Build From Source":./Build-from-source.html

View File

@ -1,7 +1,9 @@
{
"dataSource": "wikipedia",
"timestampColumn": "timestamp",
"timestampFormat": "iso",
"timestampSpec" : {
"column": "timestamp",
"format": "iso",
},
"dataSpec": {
"format": "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]

View File

@ -2,8 +2,10 @@
"type" : "index_hadoop",
"config": {
"dataSource" : "wikipedia",
"timestampColumn" : "timestamp",
"timestampFormat" : "auto",
"timestampSpec" : {
"column": "timestamp",
"format": "auto"
},
"dataSpec" : {
"format" : "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]

View File

@ -1,76 +0,0 @@
# Before running, you will need to download the EC2 tools from http://aws.amazon.com/developertools/351
# and then setup your EC2_HOME and PATH variables (or similar):
#
# # Setup environment for ec2-api-tools
# export EC2_HOME=/path/to/ec2-api-tools-1.6.7.4/
# export PATH=$PATH:$EC2_HOME/bin
# export AWS_ACCESS_KEY=
# export AWS_SECRET_KEY=
# Check for ec2 commands we require and die if they're missing
type ec2-create-keypair >/dev/null 2>&1 || { echo >&2 "I require ec2-create-keypair but it's not installed. Aborting."; exit 1; }
type ec2-create-group >/dev/null 2>&1 || { echo >&2 "I require ec2-create-group but it's not installed. Aborting."; exit 1; }
type ec2-authorize >/dev/null 2>&1 || { echo >&2 "I require ec2-authorize but it's not installed. Aborting."; exit 1; }
type ec2-run-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-run-instances but it's not installed. Aborting."; exit 1; }
type ec2-describe-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-describe-instances but it's not installed. Aborting."; exit 1; }
# Create a keypair for our servers
echo "Removing old keypair for druid..."
ec2-delete-keypair druid-keypair
echo "Creating new keypair for druid..."
ec2-create-keypair druid-keypair > druid-keypair
chmod 0600 druid-keypair
mv druid-keypair ~/.ssh/
# Create a security group for our servers
echo "Creating a new security group for druid..."
ec2-create-group druid-group -d "Druid Cluster"
# Create rules that allow necessary services in our group
echo "Creating new firewall rules for druid..."
# SSH from outside
ec2-authorize druid-group -P tcp -p 22
# Enable all traffic within group
ec2-authorize druid-group -P tcp -p 1-65535 -o druid-group
ec2-authorize druid-group -P udp -p 1-65535 -o druid-group
echo "Booting a single small instance for druid..."
# Use ami ami-e7582d8e - Alestic Ubuntu 12.04 us-east
INSTANCE_ID=$(ec2-run-instances ami-e7582d8e -n 1 -g druid-group -k druid-keypair --instance-type m1.small| awk '/INSTANCE/{print $2}')
while true; do
sleep 1
INSTANCE_STATUS=$(ec2-describe-instances|grep INSTANCE|grep $INSTANCE_ID|cut -f6)
if [ $INSTANCE_STATUS == "running" ]
then
echo "Instance $INSTANCE_ID is status $INSTANCE_STATUS..."
break
fi
done
# Wait for the instance to come up
echo "Waiting 60 seconds for instance $INSTANCE_ID to boot..."
sleep 60
# Get hostname and ssh with the key we created, and ssh there
INSTANCE_ADDRESS=`ec2-describe-instances|grep 'INSTANCE'|grep $INSTANCE_ID|cut -f4`
echo "Connecting to $INSTANCE_ADDRESS to prepare environment for druid..."
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/env.sh ubuntu@${INSTANCE_ADDRESS}:
ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh'
echo "Prepared $INSTANCE_ADDRESS for druid."
# Now to scp a tarball up that can run druid!
if [ -f ../../services/target/druid-services-*-bin.tar.gz ];
then
echo "Uploading druid tarball to server..."
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ../../services/target/druid-services-*-bin.tar.gz ubuntu@${INSTANCE_ADDRESS}:
else
echo "ERROR - package not built!"
fi
# Now boot druid parts
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/run.sh ubuntu@${INSTANCE_ADDRESS}:
ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh'
echo "Druid booting complete!"
echo "ssh -i ~/.ssh/druid-keypair ubuntu@${INSTANCE_ADDRESS} #to connect"

View File

@ -4,16 +4,16 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.52"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.server.maxSize=100000000
druid.server.maxSize=10000000000
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=10000000
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46","io.druid.extensions:druid-rabbitmq:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.52","io.druid.extensions:druid-kafka-seven:0.6.52","io.druid.extensions:druid-rabbitmq:0.6.52"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@ -14,4 +14,4 @@ druid.publish.type=noop
# druid.db.connector.user=druid
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=10000000
druid.processing.buffer.sizeBytes=100000000

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -19,11 +19,9 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
@ -39,8 +37,6 @@ public class DbUpdaterJob implements Jobby
{
private static final Logger log = new Logger(DbUpdaterJob.class);
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final HadoopDruidIndexerConfig config;
private final IDBI dbi;
@ -82,7 +78,7 @@ public class DbUpdaterJob implements Jobby
.put("partitioned", segment.getShardSpec().getPartitionNum())
.put("version", segment.getVersion())
.put("used", true)
.put("payload", jsonMapper.writeValueAsString(segment))
.put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))
.build()
);

View File

@ -443,7 +443,11 @@ public class DeterminePartitionsJob implements Jobby
final int index = bytes.getInt();
if (index >= numPartitions) {
throw new ISE("Not enough partitions, index[%,d] >= numPartitions[%,d]", index, numPartitions);
throw new ISE(
"Not enough partitions, index[%,d] >= numPartitions[%,d]. Please increase the number of reducers to the index size or check your config & settings!",
index,
numPartitions
);
}
return index;
@ -453,7 +457,6 @@ public class DeterminePartitionsJob implements Jobby
private static abstract class DeterminePartitionsDimSelectionBaseReducer
extends Reducer<BytesWritable, Text, BytesWritable, Text>
{
protected static volatile HadoopDruidIndexerConfig config = null;
@Override

View File

@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -45,22 +46,25 @@ public class ArbitraryGranularitySpec implements GranularitySpec
intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
// Insert all intervals
for(final Interval inputInterval : inputIntervals) {
for (final Interval inputInterval : inputIntervals) {
intervals.add(inputInterval);
}
// Ensure intervals are non-overlapping (but they may abut each other)
final PeekingIterator<Interval> intervalIterator = Iterators.peekingIterator(intervals.iterator());
while(intervalIterator.hasNext()) {
while (intervalIterator.hasNext()) {
final Interval currentInterval = intervalIterator.next();
if(intervalIterator.hasNext()) {
if (intervalIterator.hasNext()) {
final Interval nextInterval = intervalIterator.peek();
if(currentInterval.overlaps(nextInterval)) {
throw new IllegalArgumentException(String.format(
"Overlapping intervals: %s, %s",
currentInterval,
nextInterval));
if (currentInterval.overlaps(nextInterval)) {
throw new IllegalArgumentException(
String.format(
"Overlapping intervals: %s, %s",
currentInterval,
nextInterval
)
);
}
}
}
@ -79,10 +83,16 @@ public class ArbitraryGranularitySpec implements GranularitySpec
// First interval with start time dt
final Interval interval = intervals.floor(new Interval(dt, new DateTime(Long.MAX_VALUE)));
if(interval != null && interval.contains(dt)) {
if (interval != null && interval.contains(dt)) {
return Optional.of(interval);
} else {
return Optional.absent();
}
}
@Override
public Granularity getGranularity()
{
throw new UnsupportedOperationException();
}
}

View File

@ -22,6 +22,7 @@ package io.druid.indexer.granularity;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import com.metamx.common.Granularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -43,4 +44,6 @@ public interface GranularitySpec
/** Time-grouping interval corresponding to some instant, if any. */
public Optional<Interval> bucketInterval(DateTime dt);
public Granularity getGranularity();
}

View File

@ -67,6 +67,7 @@ public class UniformGranularitySpec implements GranularitySpec
return wrappedSpec.bucketInterval(dt);
}
@Override
@JsonProperty("gran")
public Granularity getGranularity()
{

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -86,7 +86,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
if (chatHandlerProvider.isPresent()) {
log.info("Found chathandler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(serviceName, firehose);
chatHandlerProvider.get().register(serviceName.replaceAll(".*:", ""), firehose); // rofl
if (serviceName.contains(":")) {
chatHandlerProvider.get().register(serviceName.replaceAll(".*:", ""), firehose); // rofl
}
} else {
log.info("No chathandler detected");
}

View File

@ -22,15 +22,17 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Sets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -61,6 +63,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArrayList;
public class IndexTask extends AbstractFixedIntervalTask
@ -133,7 +136,13 @@ public class IndexTask extends AbstractFixedIntervalTask
{
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet();
for (final Interval bucket : granularitySpec.bucketIntervals()) {
final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals(), getDataIntervals());
if (validIntervals.isEmpty()) {
throw new ISE("No valid data intervals found. Check your configs!");
}
for (final Interval bucket : validIntervals) {
final List<ShardSpec> shardSpecs;
if (targetPartitionSize > 0) {
shardSpecs = determinePartitions(bucket, targetPartitionSize);
@ -160,6 +169,19 @@ public class IndexTask extends AbstractFixedIntervalTask
return TaskStatus.success(getId());
}
private SortedSet<Interval> getDataIntervals() throws IOException
{
SortedSet<Interval> retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
try (Firehose firehose = firehoseFactory.connect()) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
Interval interval = granularitySpec.getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
retVal.add(interval);
}
}
return retVal;
}
private List<ShardSpec> determinePartitions(
final Interval interval,
final int targetPartitionSize

View File

@ -194,7 +194,7 @@ public class RealtimeIndexTask extends AbstractTask
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
windowPeriod,
new File(toolbox.getTaskWorkDir(), "persist"),
segmentGranularity
segmentGranularity, fireDepartmentConfig.getMaxPendingPersists()
);
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);

View File

@ -43,7 +43,6 @@ import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
@ -98,7 +97,7 @@ public class DbTaskStorage implements TaskStorage
}
@Override
public void insert(final Task task, final TaskStatus status)
public void insert(final Task task, final TaskStatus status) throws TaskExistsException
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
@ -137,9 +136,11 @@ public class DbTaskStorage implements TaskStorage
}
);
}
catch (StatementException e) {
// Might be a duplicate task ID.
if (getTask(task.getId()).isPresent()) {
catch (Exception e) {
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getTask(task.getId()).isPresent()) {
throw new TaskExistsException(task.getId(), e);
} else {
throw e;
@ -533,11 +534,8 @@ public class DbTaskStorage implements TaskStorage
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
}
catch (RuntimeException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
throw new CallbackFailedException(e);
throw Throwables.propagate(e);
}
}

View File

@ -63,7 +63,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
public void insert(Task task, TaskStatus status)
public void insert(Task task, TaskStatus status) throws TaskExistsException
{
giant.lock();

View File

@ -61,6 +61,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
@ -335,7 +336,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
pendingTaskPayloads.remove(taskId);
log.info("Removed task from pending queue: %s", taskId);
} else if (completeTasks.containsKey(taskId)) {
cleanup(completeTasks.get(taskId).getWorker().getHost(), taskId);
cleanup(taskId);
} else {
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
@ -469,28 +470,32 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
/**
* Removes a task from the complete queue and clears out the ZK status path of the task.
*
* @param workerId - the worker that was previously running the task
* @param taskId - the task to cleanup
*/
private void cleanup(final String workerId, final String taskId)
private void cleanup(final String taskId)
{
if (!started) {
return;
}
if (completeTasks.remove(taskId) == null) {
final RemoteTaskRunnerWorkItem removed = completeTasks.remove(taskId);
final Worker worker = removed.getWorker();
if (removed == null || worker == null) {
log.makeAlert("WTF?! Asked to cleanup nonexistent task")
.addData("workerId", workerId)
.addData("taskId", taskId)
.emit();
} else {
final String workerId = worker.getHost();
log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId);
final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId);
try {
cf.delete().guaranteed().forPath(statusPath);
}
catch (Exception e) {
catch (KeeperException.NoNodeException e) {
log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", statusPath);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
@ -593,7 +598,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
elapsed,
config.getTaskAssignmentTimeout()
);
taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
break;
}
@ -666,7 +670,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
SettableFuture.<TaskStatus>create(),
zkWorker.getWorker()
);
runningTasks.put(taskId, taskRunnerWorkItem);
runningTasks.put(taskId, taskRunnerWorkItem.withWorker(zkWorker.getWorker()));
}
if (taskStatus.isComplete()) {

View File

@ -19,7 +19,7 @@
package io.druid.indexing.overlord;
public class TaskExistsException extends RuntimeException
public class TaskExistsException extends Exception
{
private final String taskId;

View File

@ -296,7 +296,7 @@ public class TaskQueue
*
* @return true
*/
public boolean add(final Task task)
public boolean add(final Task task) throws TaskExistsException
{
giant.lock();
@ -306,15 +306,8 @@ public class TaskQueue
Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize());
// If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue.
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
}
catch (TaskExistsException e) {
log.warn("Attempt to add task twice: %s", task.getId());
throw Throwables.propagate(e);
}
// insert the task into our queue. So don't catch it.
taskStorage.insert(task, TaskStatus.running(task.getId()));
tasks.add(task);
managementMayBeNecessary.signalAll();
return true;

View File

@ -30,10 +30,11 @@ import java.util.List;
public interface TaskStorage
{
/**
* Adds a task to the storage facility with a particular status. If the task ID already exists, this method
* will throw a {@link TaskExistsException}.
* Adds a task to the storage facility with a particular status.
*
* @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists
*/
public void insert(Task task, TaskStatus status);
public void insert(Task task, TaskStatus status) throws TaskExistsException;
/**
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle

View File

@ -36,6 +36,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskExistsException;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
@ -45,6 +46,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.apache.zookeeper.data.Stat;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes;
@ -126,8 +128,15 @@ public class OverlordResource
@Override
public Response apply(TaskQueue taskQueue)
{
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
try {
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (TaskExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId())))
.build();
}
}
}
);

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

12
pom.xml
View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
@ -39,7 +39,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.1</metamx.java-util.version>
<metamx.java-util.version>0.25.2</metamx.java-util.version>
<apache.curator.version>2.3.0</apache.curator.version>
<druid.api.version>0.1.7</druid.api.version>
</properties>
@ -73,12 +73,12 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.2.7</version>
<version>0.2.9</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.8.4</version>
<version>0.8.5</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -143,6 +143,10 @@
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -519,8 +519,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn);
}
for (Object column : complexColumnCache.values()) {
if (column instanceof Closeable) {
for (Object column : objectColumnCache.values()) {
if(column instanceof Closeable) {
Closeables.closeQuietly((Closeable) column);
}
}

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -25,8 +25,9 @@ import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client;
import io.druid.query.QueryRunner;
@ -57,19 +58,22 @@ public class BrokerServerView implements TimelineServerView
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ServerView baseView;
private final ServerSelectorStrategy serverSelectorStrategy;
@Inject
public BrokerServerView(
QueryToolChestWarehouse warehose,
ObjectMapper smileMapper,
@Client HttpClient httpClient,
ServerView baseView
ServerView baseView,
ServerSelectorStrategy serverSelectorStrategy
)
{
this.warehose = warehose;
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.baseView = baseView;
this.serverSelectorStrategy = serverSelectorStrategy;
this.clients = Maps.newConcurrentMap();
this.selectors = Maps.newHashMap();
@ -164,7 +168,7 @@ public class BrokerServerView implements TimelineServerView
ServerSelector selector = selectors.get(segmentId);
if (selector == null) {
selector = new ServerSelector(segment);
selector = new ServerSelector(segment, serverSelectorStrategy);
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {

View File

@ -40,8 +40,8 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.cache.Cache;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;

View File

@ -29,6 +29,9 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence;
@ -123,12 +126,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
typeRef = types.lhs;
}
final Future<InputStream> future;
final ListenableFuture<InputStream> future;
final String url = String.format("http://%s/druid/v2/", host);
try {
log.debug("Querying url[%s]", url);
openConnections.getAndIncrement();
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
@ -169,11 +171,27 @@ public class DirectDruidClient<T> implements QueryRunner<T>
stopTime - startTime,
byteCount / (0.0001 * (stopTime - startTime))
);
openConnections.getAndDecrement();
return super.done(clientResponse);
}
}
);
openConnections.getAndIncrement();
Futures.addCallback(
future, new FutureCallback<InputStream>()
{
@Override
public void onSuccess(InputStream result)
{
openConnections.getAndDecrement();
}
@Override
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
}
}
);
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.google.common.primitives.Ints;
import java.util.Collections;
import java.util.Comparator;
import java.util.Set;
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
{
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
};
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
{
return Collections.min(servers, comparator);
}
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.google.common.collect.Iterators;
import java.util.Random;
import java.util.Set;
public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Random random = new Random();
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
{
return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
}
}

View File

@ -31,24 +31,18 @@ import java.util.Set;
*/
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{
private static final Comparator<QueryableDruidServer> comparator = new Comparator<QueryableDruidServer>()
{
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
};
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final DataSegment segment;
private final ServerSelectorStrategy strategy;
public ServerSelector(
DataSegment segment
DataSegment segment,
ServerSelectorStrategy strategy
)
{
this.segment = segment;
this.strategy = strategy;
}
public DataSegment getSegment()
@ -86,7 +80,7 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
switch (size) {
case 0: return null;
case 1: return servers.iterator().next();
default: return Collections.min(servers, comparator);
default: return strategy.pick(servers);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "random", value = RandomServerSelectorStrategy.class),
@JsonSubTypes.Type(name = "connectionCount", value = ConnectionCountServerSelectorStrategy.class)
})
public interface ServerSelectorStrategy
{
public QueryableDruidServer pick(Set<QueryableDruidServer> servers);
}

View File

@ -103,7 +103,7 @@ public class HttpClientModule implements Module
private int numConnections = 5;
@JsonProperty
private Period readTimeout = new Period("PT5M");
private Period readTimeout = new Period("PT15M");
public int getNumConnections()
{

View File

@ -28,17 +28,23 @@ import org.joda.time.Period;
*/
public class FireDepartmentConfig
{
private static int MAX_PENDING_PERSIST_BATCHES_DEFAULT = 2;
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
private final int maxPendingPersists;
@JsonCreator
public FireDepartmentConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
this.maxRowsInMemory = maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod;
this.maxPendingPersists = maxPendingPersists > 0
? maxPendingPersists
: MAX_PENDING_PERSIST_BATCHES_DEFAULT;
Preconditions.checkArgument(maxRowsInMemory > 0, "maxRowsInMemory[%s] should be greater than 0", maxRowsInMemory);
Preconditions.checkNotNull(intermediatePersistPeriod, "intermediatePersistPeriod");
@ -55,4 +61,10 @@ public class FireDepartmentConfig
{
return intermediatePersistPeriod;
}
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
}

View File

@ -46,7 +46,8 @@ public class FlushingPlumber extends RealtimePlumber
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
VersioningPolicy versioningPolicy
VersioningPolicy versioningPolicy,
int maxPendingPersists
)
{
super(
@ -63,7 +64,8 @@ public class FlushingPlumber extends RealtimePlumber
versioningPolicy,
null,
null,
null
null,
maxPendingPersists
);
this.flushDuration = flushDuration;

View File

@ -50,6 +50,7 @@ public class FlushingPlumberSchool implements PlumberSchool
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final int maxPendingPersists;
@JacksonInject
@NotNull
@ -76,7 +77,8 @@ public class FlushingPlumberSchool implements PlumberSchool
@JsonProperty("flushDuration") Duration flushDuration,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
this.flushDuration = flushDuration;
@ -85,7 +87,9 @@ public class FlushingPlumberSchool implements PlumberSchool
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersists = maxPendingPersists;
Preconditions.checkArgument(maxPendingPersists > 0, "FlushingPlumberSchool requires maxPendingPersists > 0");
Preconditions.checkNotNull(flushDuration, "FlushingPlumberSchool requires a flushDuration.");
Preconditions.checkNotNull(windowPeriod, "FlushingPlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "FlushingPlumberSchool requires a basePersistDirectory.");
@ -113,7 +117,8 @@ public class FlushingPlumberSchool implements PlumberSchool
conglomerate,
segmentAnnouncer,
queryExecutorService,
versioningPolicy
versioningPolicy,
maxPendingPersists
);
}

View File

@ -19,6 +19,7 @@ import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -43,7 +44,6 @@ import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
@ -83,7 +83,7 @@ public class RealtimePlumber implements Plumber
private final DataSegmentPusher dataSegmentPusher;
private final SegmentPublisher segmentPublisher;
private final ServerView serverView;
private final int maxPendingPersists;
private final Object handoffCondition = new Object();
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
@ -110,7 +110,8 @@ public class RealtimePlumber implements Plumber
VersioningPolicy versioningPolicy,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
ServerView serverView
ServerView serverView,
int maxPendingPersists
)
{
this.windowPeriod = windowPeriod;
@ -127,6 +128,7 @@ public class RealtimePlumber implements Plumber
this.dataSegmentPusher = dataSegmentPusher;
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.maxPendingPersists = maxPendingPersists;
}
public Schema getSchema()
@ -424,12 +426,9 @@ public class RealtimePlumber implements Plumber
protected void initializeExecutors()
{
if (persistExecutor == null) {
persistExecutor = Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_persist_%d")
.build()
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.newBlockingSingleThreaded(
"plumber_persist_%d", maxPendingPersists
);
}
if (scheduledExecutor == null) {

View File

@ -45,40 +45,35 @@ import java.util.concurrent.ExecutorService;
public class RealtimePlumberSchool implements PlumberSchool
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
private static final int defaultPending = 2;
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final int maxPendingPersists;
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentPusher dataSegmentPusher = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
private volatile SegmentPublisher segmentPublisher = null;
@JacksonInject
@NotNull
private volatile ServerView serverView = null;
@JacksonInject
@NotNull
@Processing
private volatile ExecutorService queryExecutorService = null;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@ -86,7 +81,8 @@ public class RealtimePlumberSchool implements PlumberSchool
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
this.windowPeriod = windowPeriod;
@ -94,7 +90,9 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersists = (maxPendingPersists > 0) ? maxPendingPersists : defaultPending;
Preconditions.checkArgument(maxPendingPersists > 0, "RealtimePlumberSchool requires maxPendingPersists > 0");
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
@ -169,7 +167,8 @@ public class RealtimePlumberSchool implements PlumberSchool
versioningPolicy,
dataSegmentPusher,
segmentPublisher,
serverView
serverView,
maxPendingPersists
);
}

View File

@ -40,7 +40,12 @@ public class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory
@Override
public boolean accept(long timestamp)
{
return timestamp >= (System.currentTimeMillis() - windowMillis);
long now = System.currentTimeMillis();
boolean notTooOld = timestamp >= (now - windowMillis);
boolean notTooYoung = timestamp <= (now + windowMillis);
return notTooOld && notTooYoung;
}
@Override

View File

@ -128,6 +128,7 @@ public class QueryResource
.setUser4(query.getType())
.setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);

View File

@ -44,6 +44,7 @@ import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.collections.CountingMap;
import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
@ -53,6 +54,8 @@ import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.segment.IndexIO;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
@ -63,7 +66,6 @@ import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@ -72,6 +74,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -183,6 +186,53 @@ public class DruidCoordinator
return leader;
}
public Map<String, LoadQueuePeon> getLoadManagementPeons()
{
return loadManagementPeons;
}
public Map<String, Double> getReplicationStatus()
{
// find expected load per datasource
final CountingMap<String> expectedSegmentsInCluster = new CountingMap<>();
final DateTime now = new DateTime();
for (DataSegment segment : getAvailableDataSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
for (Rule rule : rules) {
if (rule instanceof LoadRule && rule.appliesTo(segment, now)) {
expectedSegmentsInCluster.add(segment.getDataSource(), ((LoadRule) rule).getReplicants());
//Integer count = expectedSegmentsInCluster.get(segment.getDataSource());
//if (count == null) {
// count = 0;
//}
//expectedSegmentsInCluster.put(segment.getDataSource(), count + ((LoadRule) rule).getReplicants());
break;
}
}
}
// find segments currently loaded per datasource
Map<String, Integer> segmentsInCluster = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
for (DataSegment segment : druidServer.getSegments().values()) {
Integer count = segmentsInCluster.get(segment.getDataSource());
if (count == null) {
count = 0;
}
segmentsInCluster.put(segment.getDataSource(), count + 1);
}
}
// compare available segments with currently loaded
Map<String, Double> loadStatus = Maps.newHashMap();
for (Map.Entry<String, AtomicLong> entry : expectedSegmentsInCluster.entrySet()) {
Integer actual = segmentsInCluster.get(entry.getKey());
loadStatus.put(entry.getKey(), 100 * (actual == null ? 0.0D : (double) actual) / entry.getValue().get());
}
return loadStatus;
}
public Map<String, Double> getLoadStatus()
{
// find available segments
@ -687,11 +737,11 @@ public class DruidCoordinator
// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory())
.withDynamicConfigs(dynamicConfigs.get())
.withEmitter(emitter)
.build();
.withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory())
.withDynamicConfigs(dynamicConfigs.get())
.withEmitter(emitter)
.build();
for (DruidCoordinatorHelper helper : helpers) {
@ -724,7 +774,7 @@ public class DruidCoordinator
{
@Override
public boolean apply(
@Nullable DruidServer input
DruidServer input
)
{
return input.getType().equalsIgnoreCase("historical");
@ -760,11 +810,11 @@ public class DruidCoordinator
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
// Stop peons for servers that aren't there anymore.
final Set<String> disdappearedServers = Sets.newHashSet(loadManagementPeons.keySet());
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (DruidServer server : servers) {
disdappearedServers.remove(server.getName());
disappeared.remove(server.getName());
}
for (String name : disdappearedServers) {
for (String name : disappeared) {
log.info("Removing listener for server[%s] which is no longer there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();

View File

@ -19,6 +19,7 @@
package io.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
@ -103,6 +104,7 @@ public class LoadQueuePeon
this.config = config;
}
@JsonProperty
public Set<DataSegment> getSegmentsToLoad()
{
return new ConcurrentSkipListSet<DataSegment>(
@ -120,6 +122,7 @@ public class LoadQueuePeon
);
}
@JsonProperty
public Set<DataSegment> getSegmentsToDrop()
{
return new ConcurrentSkipListSet<DataSegment>(

View File

@ -32,7 +32,7 @@ import javax.ws.rs.core.Response;
/**
*/
@Path("/coordinator/config")
@Path("/druid/coordinator/v1/config")
public class CoordinatorDynamicConfigsResource
{
private final JacksonConfigManager manager;

View File

@ -19,21 +19,23 @@
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.timeline.DataSegment;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.List;
/**
*/
@Path("/coordinator")
@Path("/druid/coordinator/v1")
public class CoordinatorResource
{
private final DruidCoordinator coordinator;
@ -46,74 +48,64 @@ public class CoordinatorResource
this.coordinator = coordinator;
}
@POST
@Path("/move")
@Consumes("application/json")
public Response moveSegment(List<SegmentToMove> segmentsToMove)
@GET
@Path("/leader")
@Produces("application/json")
public Response getLeader()
{
Response resp = Response.status(Response.Status.OK).build();
for (SegmentToMove segmentToMove : segmentsToMove) {
try {
coordinator.moveSegment(
segmentToMove.getFromServer(),
segmentToMove.getToServer(),
segmentToMove.getSegmentName(),
new LoadPeonCallback()
{
@Override
protected void execute()
{
return;
}
}
);
}
catch (Exception e) {
resp = Response
.status(Response.Status.BAD_REQUEST)
.entity(e.getMessage())
.build();
break;
}
}
return resp;
}
@POST
@Path("/drop")
@Consumes("application/json")
public Response dropSegment(List<SegmentToDrop> segmentsToDrop)
{
Response resp = Response.status(Response.Status.OK).build();
for (SegmentToDrop segmentToDrop : segmentsToDrop) {
try {
coordinator.dropSegment(
segmentToDrop.getFromServer(), segmentToDrop.getSegmentName(), new LoadPeonCallback()
{
@Override
protected void execute()
{
return;
}
}
);
}
catch (Exception e) {
resp = Response
.status(Response.Status.BAD_REQUEST)
.entity(e.getMessage())
.build();
break;
}
}
return resp;
return Response.ok(coordinator.getCurrentLeader()).build();
}
@GET
@Path("/loadstatus")
@Produces("application/json")
public Response getLoadStatus()
public Response getLoadStatus(
@QueryParam("full") String full
)
{
if (full != null) {
return Response.ok(coordinator.getReplicationStatus()).build();
}
return Response.ok(coordinator.getLoadStatus()).build();
}
@GET
@Path("loadqueue")
@Produces("application/json")
public Response getLoadQueue(
@QueryParam("simple") String simple
)
{
if (simple != null) {
return Response.ok(
Maps.transformValues(
coordinator.getLoadManagementPeons(),
new Function<LoadQueuePeon, Object>()
{
@Override
public Object apply(LoadQueuePeon input)
{
long loadSize = 0;
for (DataSegment dataSegment : input.getSegmentsToLoad()) {
loadSize += dataSegment.getSize();
}
long dropSize = 0;
for (DataSegment dataSegment : input.getSegmentsToDrop()) {
dropSize += dataSegment.getSize();
}
return new ImmutableMap.Builder<>()
.put("segmentsToLoad", input.getSegmentsToLoad().size())
.put("segmentsToDrop", input.getSegmentsToDrop().size())
.put("segmentsToLoadSize", loadSize)
.put("segmentsToDropSize", dropSize)
.build();
}
}
)
).build();
}
return Response.ok(coordinator.getLoadManagementPeons()).build();
}
}

View File

@ -0,0 +1,159 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import io.druid.client.DruidDataSource;
import io.druid.db.DatabaseSegmentManager;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.List;
/**
*/
@Path("/druid/coordinator/v1/db")
public class DBResource
{
private final DatabaseSegmentManager databaseSegmentManager;
@Inject
public DBResource(
DatabaseSegmentManager databaseSegmentManager
)
{
this.databaseSegmentManager = databaseSegmentManager;
}
@GET
@Path("/datasources")
@Produces("application/json")
public Response getDatabaseDataSources(
@QueryParam("full") String full,
@QueryParam("includeDisabled") String includeDisabled
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (includeDisabled != null) {
return builder.entity(databaseSegmentManager.getAllDatasourceNames()).build();
}
if (full != null) {
return builder.entity(databaseSegmentManager.getInventory()).build();
}
List<String> dataSourceNames = Lists.newArrayList(
Iterables.transform(
databaseSegmentManager.getInventory(),
new Function<DruidDataSource, String>()
{
@Override
public String apply(@Nullable DruidDataSource dataSource)
{
return dataSource.getName();
}
}
)
);
Collections.sort(dataSourceNames);
return builder.entity(dataSourceNames).build();
}
@GET
@Path("/datasources/{dataSourceName}")
@Produces("application/json")
public Response getDatabaseSegmentDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).entity(dataSource).build();
}
@GET
@Path("/datasources/{dataSourceName}/segments")
@Produces("application/json")
public Response getDatabaseSegmentDataSourceSegments(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("full") String full
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(dataSource.getSegments()).build();
}
return builder.entity(
Iterables.transform(
dataSource.getSegments(),
new Function<DataSegment, Object>()
{
@Override
public Object apply(@Nullable DataSegment segment)
{
return segment.getIdentifier();
}
}
)
).build();
}
@GET
@Path("/datasources/{dataSourceName}/segments/{segmentId}")
@Produces("application/json")
public Response getDatabaseSegmentDataSourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
for (DataSegment segment : dataSource.getSegments()) {
if (segment.getIdentifier().equalsIgnoreCase(segmentId)) {
return Response.status(Response.Status.OK).entity(segment).build();
}
}
return Response.status(Response.Status.NOT_FOUND).build();
}
}

View File

@ -0,0 +1,325 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.DatabaseSegmentManager;
import io.druid.segment.IndexGranularity;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
*/
@Path("/druid/coordinator/v1/datasources")
public class DatasourcesResource
{
private static Map<String, Object> makeSimpleDatasource(DruidDataSource input)
{
return new ImmutableMap.Builder<String, Object>()
.put("name", input.getName())
.put("properties", input.getProperties())
.build();
}
private final InventoryView serverInventoryView;
private final DatabaseSegmentManager databaseSegmentManager;
private final IndexingServiceClient indexingServiceClient;
@Inject
public DatasourcesResource(
InventoryView serverInventoryView,
DatabaseSegmentManager databaseSegmentManager,
@Nullable IndexingServiceClient indexingServiceClient
)
{
this.serverInventoryView = serverInventoryView;
this.databaseSegmentManager = databaseSegmentManager;
this.indexingServiceClient = indexingServiceClient;
}
@GET
@Produces("application/json")
public Response getQueryableDataSources(
@QueryParam("full") String full,
@QueryParam("simple") String simple,
@QueryParam("gran") String gran
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(getDataSources()).build();
} else if (simple != null) {
return builder.entity(
Lists.newArrayList(
Iterables.transform(
getDataSources(),
new Function<DruidDataSource, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(DruidDataSource dataSource)
{
return makeSimpleDatasource(dataSource);
}
}
)
)
).build();
} else if (gran != null) {
IndexGranularity granularity = IndexGranularity.fromString(gran);
// TODO
}
return builder.entity(
Lists.newArrayList(
Iterables.transform(
getDataSources(),
new Function<DruidDataSource, String>()
{
@Override
public String apply(DruidDataSource dataSource)
{
return dataSource.getName();
}
}
)
)
).build();
}
@DELETE
@Path("/{dataSourceName}")
public Response deleteDataSource(
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("kill") final String kill,
@QueryParam("interval") final String interval
)
{
if (indexingServiceClient == null) {
return Response.status(Response.Status.OK).entity(ImmutableMap.of("error", "no indexing service found")).build();
}
if (kill != null && Boolean.valueOf(kill)) {
indexingServiceClient.killSegments(dataSourceName, new Interval(interval));
} else {
if (!databaseSegmentManager.removeDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
return Response.status(Response.Status.OK).build();
}
@POST
@Path("/{dataSourceName}")
@Consumes("application/json")
public Response enableDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
if (!databaseSegmentManager.enableDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).build();
}
@GET
@Path("/{dataSourceName}/segments")
@Produces("application/json")
public Response getSegmentDataSourceSegments(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("full") String full
)
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(dataSource.getSegments()).build();
}
return builder.entity(
Iterables.transform(
dataSource.getSegments(),
new Function<DataSegment, Object>()
{
@Override
public Object apply(@Nullable DataSegment segment)
{
return segment.getIdentifier();
}
}
)
).build();
}
@GET
@Path("/{dataSourceName}/segments/{segmentId}")
@Produces("application/json")
public Response getSegmentDataSourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
for (DataSegment segment : dataSource.getSegments()) {
if (segment.getIdentifier().equalsIgnoreCase(segmentId)) {
return Response.status(Response.Status.OK).entity(segment).build();
}
}
return Response.status(Response.Status.NOT_FOUND).build();
}
@DELETE
@Path("/{dataSourceName}/segments/{segmentId}")
public Response deleteDatasourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
if (!databaseSegmentManager.removeSegment(dataSourceName, segmentId)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).build();
}
@POST
@Path("/{dataSourceName}/segments/{segmentId}")
@Consumes("application/json")
public Response enableDatasourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
if (!databaseSegmentManager.enableSegment(segmentId)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).build();
}
private DruidDataSource getDataSource(final String dataSourceName)
{
Iterable<DruidDataSource> dataSources =
Iterables.concat(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, DruidDataSource>()
{
@Override
public DruidDataSource apply(DruidServer input)
{
return input.getDataSource(dataSourceName);
}
}
)
);
List<DruidDataSource> validDataSources = Lists.newArrayList();
for (DruidDataSource dataSource : dataSources) {
if (dataSource != null) {
validDataSources.add(dataSource);
}
}
if (validDataSources.isEmpty()) {
return null;
}
Map<String, DataSegment> segmentMap = Maps.newHashMap();
for (DruidDataSource dataSource : validDataSources) {
if (dataSource != null) {
Iterable<DataSegment> segments = dataSource.getSegments();
for (DataSegment segment : segments) {
segmentMap.put(segment.getIdentifier(), segment);
}
}
}
return new DruidDataSource(
dataSourceName,
ImmutableMap.<String, String>of()
).addSegments(segmentMap);
}
private Set<DruidDataSource> getDataSources()
{
TreeSet<DruidDataSource> dataSources = Sets.newTreeSet(
new Comparator<DruidDataSource>()
{
@Override
public int compare(DruidDataSource druidDataSource, DruidDataSource druidDataSource1)
{
return druidDataSource.getName().compareTo(druidDataSource1.getName());
}
}
);
dataSources.addAll(
Lists.newArrayList(
Iterables.concat(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, Iterable<DruidDataSource>>()
{
@Override
public Iterable<DruidDataSource> apply(DruidServer input)
{
return input.getDataSources();
}
}
)
)
)
);
return dataSources;
}
}

View File

@ -57,6 +57,7 @@ import java.util.TreeSet;
/**
*/
@Deprecated
@Path("/info")
public class InfoResource
{
@ -102,7 +103,6 @@ public class InfoResource
private final DatabaseRuleManager databaseRuleManager;
private final IndexingServiceClient indexingServiceClient;
@Inject
public InfoResource(
DruidCoordinator coordinator,

View File

@ -0,0 +1,70 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.inject.Inject;
import io.druid.db.DatabaseRuleManager;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
/**
*/
@Path("/druid/coordinator/v1/rules")
public class RulesResource
{
private final DatabaseRuleManager databaseRuleManager;
@Inject
public RulesResource(
DatabaseRuleManager databaseRuleManager
)
{
this.databaseRuleManager = databaseRuleManager;
}
@GET
@Produces("application/json")
public Response getRules()
{
return Response.ok(databaseRuleManager.getAllRules()).build();
}
@GET
@Path("/{dataSourceName}")
@Produces("application/json")
public Response getDatasourceRules(
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("full") final String full
)
{
if (full != null) {
return Response.ok(databaseRuleManager.getRulesWithDefault(dataSourceName))
.build();
}
return Response.ok(databaseRuleManager.getRules(dataSourceName))
.build();
}
}

View File

@ -0,0 +1,188 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Map;
/**
*/
@Path("/druid/coordinator/v1/servers")
public class ServersResource
{
private static Map<String, Object> makeSimpleServer(DruidServer input)
{
return new ImmutableMap.Builder<String, Object>()
.put("host", input.getHost())
.put("tier", input.getTier())
.put("currSize", input.getCurrSize())
.put("maxSize", input.getMaxSize())
.build();
}
private final InventoryView serverInventoryView;
@Inject
public ServersResource(
InventoryView serverInventoryView
)
{
this.serverInventoryView = serverInventoryView;
}
@GET
@Produces("application/json")
public Response getClusterServers(
@QueryParam("full") String full,
@QueryParam("simple") String simple
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(Lists.newArrayList(serverInventoryView.getInventory())).build();
} else if (simple != null) {
return builder.entity(
Lists.newArrayList(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(DruidServer input)
{
return makeSimpleServer(input);
}
}
)
)
).build();
}
return builder.entity(
Lists.newArrayList(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, String>()
{
@Override
public String apply(DruidServer druidServer)
{
return druidServer.getHost();
}
}
)
)
).build();
}
@GET
@Path("/{serverName}")
@Produces("application/json")
public Response getServer(
@PathParam("serverName") String serverName,
@QueryParam("simple") String simple
)
{
DruidServer server = serverInventoryView.getInventoryValue(serverName);
if (server == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (simple != null) {
return builder.entity(makeSimpleServer(server)).build();
}
return builder.entity(server)
.build();
}
@GET
@Path("/{serverName}/segments")
@Produces("application/json")
public Response getServerSegments(
@PathParam("serverName") String serverName,
@QueryParam("full") String full
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
DruidServer server = serverInventoryView.getInventoryValue(serverName);
if (server == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
if (full != null) {
return builder.entity(server.getSegments().values()).build();
}
return builder.entity(
Collections2.transform(
server.getSegments().values(),
new Function<DataSegment, String>()
{
@Override
public String apply(@Nullable DataSegment segment)
{
return segment.getIdentifier();
}
}
)
).build();
}
@GET
@Path("/{serverName}/segments/{segmentId}")
@Produces("application/json")
public Response getServerSegment(
@PathParam("serverName") String serverName,
@PathParam("segmentId") String segmentId
)
{
DruidServer server = serverInventoryView.getInventoryValue(serverName);
if (server == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
DataSegment segment = server.getSegment(segmentId);
if (segment == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).entity(segment).build();
}
}

View File

@ -0,0 +1,88 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.api.client.util.Maps;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.inject.Inject;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Map;
import java.util.Set;
/**
*/
@Path("/druid/coordinator/v1/tiers")
public class TiersResource
{
private final InventoryView serverInventoryView;
@Inject
public TiersResource(
InventoryView serverInventoryView
)
{
this.serverInventoryView = serverInventoryView;
}
@GET
@Produces("application/json")
public Response getTiers(
@QueryParam("simple") String simple
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (simple != null) {
Map<String, Map<String, Long>> metadata = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
Map<String, Long> tierMetadata = metadata.get(druidServer.getTier());
if (tierMetadata == null) {
tierMetadata = Maps.newHashMap();
metadata.put(druidServer.getTier(), tierMetadata);
}
Long currSize = tierMetadata.get("currSize");
tierMetadata.put("currSize", (currSize == null) ? 0 : currSize + druidServer.getCurrSize());
Long maxSize = tierMetadata.get("maxSize");
tierMetadata.put("maxSize", (maxSize == null) ? 0 : maxSize + druidServer.getMaxSize());
}
return builder.entity(metadata).build();
}
Set<String> tiers = Sets.newHashSet();
for (DruidServer server : serverInventoryView.getInventory()) {
tiers.add(server.getTier());
}
return builder.entity(tiers).build();
}
}

View File

@ -21,7 +21,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'POST',
url:'/info/datasources/' + selected,
url:'/druid/coordinator/v1/datasources/' + selected,
data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8",
dataType:"json",
@ -50,7 +50,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'DELETE',
url:'/info/datasources/' + selected,
url:'/druid/coordinator/v1/datasources/' + selected,
data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8",
dataType:"json",
@ -70,12 +70,12 @@ $(document).ready(function() {
}
});
$.getJSON("/info/db/datasources", function(enabled_datasources) {
$.getJSON("/druid/coordinator/v1/db/datasources", function(enabled_datasources) {
$.each(enabled_datasources, function(index, datasource) {
$('#enabled_datasources').append($('<li>' + datasource + '</li>'));
});
$.getJSON("/info/db/datasources?includeDisabled", function(db_datasources) {
$.getJSON("/druid/coordinator/v1/db/datasources?includeDisabled", function(db_datasources) {
var disabled_datasources = _.difference(db_datasources, enabled_datasources);
$.each(disabled_datasources, function(index, datasource) {
$('#disabled_datasources').append($('<li>' + datasource + '</li>'));

View File

@ -2,7 +2,7 @@
$(document).ready(function() {
var basePath = "/info/";
var basePath = "/druid/coordinator/v1/";
var type = $('#select_type').attr('value') + '';
var view = $('#select_view').attr('value') + '';

View File

@ -100,8 +100,8 @@ $(document).ready(function() {
}
// Execution stuff
$.get('/info/coordinator', function(data) {
$("#coordinator").html('Current Cluster Coordinator: ' + data.host);
$.get('/druid/coordinator/v1/leader', function(data) {
$("#coordinator").html('Current Cluster Coordinator Leader: ' + data.host);
});
$('#move_segment').submit(function() {
@ -118,57 +118,10 @@ $(document).ready(function() {
});
}
/*
$.ajax({
url:"/coordinator/move",
type: "POST",
data: JSON.stringify(data),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
alert(error + ": " + xhr.responseText);
},
success: function(data, status, xhr) {
for (seg in CONSOLE.selected_segments) {
CONSOLE.selected_segments[seg].children('.server_host').text($('#move_segment > .to').val());
}
}
});
*/
return false;
});
/*$
('#drop_segment').submit(function() {
var data = [];
if ($.isEmptyObject(CONSOLE.selected_segments)) {
alert("Please select at least one segment");
}
for (seg in CONSOLE.selected_segments) {
data.push({
'segmentName' : seg,
'from' : CONSOLE.selected_segments[seg]
});
}
$.ajax({
url:"/coordinator/drop",
type: "POST",
data: JSON.stringify(data),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
alert(error + ": " + xhr.responseText);
}
});
return false;
});
*/
$.get('/info/cluster', function(data) {
$.get('/druid/coordinator/v1/servers?full', function(data) {
$('.loading').hide();
initTables(data);
@ -176,26 +129,5 @@ $(document).ready(function() {
var oTable = [];
initDataTable($('#servers'), oTable);
initDataTable($('#segments'), oTable);
// init select segments
/*$("#segments tbody").click(function(event) {
var el = $(event.target.parentNode);
var key = el.children('.segment_name').text();
if (el.is("tr")) {
if (el.hasClass('row_selected')) {
el.removeClass('row_selected');
delete CONSOLE.selected_segments[key];
} else {
el.addClass('row_selected');
CONSOLE.selected_segments[key] = el;
}
var html ="";
for (segment in CONSOLE.selected_segments) {
html += segment + ' on ' + CONSOLE.selected_segments[segment].children('.server_host').text() + '<br/>';
}
$('#selected_segments').html(html);
}
});*/
});
});

View File

@ -22,7 +22,7 @@ $(document).ready(function() {
var interval = $('#interval').val();
$.ajax({
type: 'DELETE',
url:'/info/datasources/' + selected +'?kill=true&interval=' + interval,
url:'/druid/coordinator/v1/datasources/' + selected +'?kill=true&interval=' + interval,
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
@ -41,7 +41,7 @@ $(document).ready(function() {
}
});
$.getJSON("/info/db/datasources?includeDisabled", function(data) {
$.getJSON("/druid/coordinator/v1/db/datasources?includeDisabled", function(data) {
$.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
});

View File

@ -115,7 +115,7 @@ function makeTiersDropdown(rule) {
function getRules() {
var selected = $('#datasources option:selected').text();
if (selected !== "") {
$.getJSON("/info/rules/" + selected, function(data) {
$.getJSON("/druid/coordinator/v1/rules/" + selected, function(data) {
$('#rules_list').empty();
if (!$.isEmptyObject(data)) {
$.each(data, function(index, rule) {
@ -189,7 +189,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'POST',
url:'/info/rules/' + selected,
url:'/druid/coordinator/v1/rules/' + selected,
data: JSON.stringify(rules),
contentType:"application/json; charset=utf-8",
dataType:"json",
@ -209,11 +209,11 @@ $(document).ready(function() {
}
});
$.getJSON("/info/tiers", function(theTiers) {
$.getJSON("/druid/coordinator/v1/tiers", function(theTiers) {
tiers = theTiers;
});
$.getJSON("/info/db/datasources", function(data) {
$.getJSON("/druid/coordinator/v1/db/datasources", function(data) {
$.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
});

View File

@ -17,35 +17,41 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
package io.druid.client;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import io.druid.client.DirectDruidClient;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
/**
*/
public class ServerSelectorTest
public class DirectDruidClientTest
{
private HttpClient httpClient;
@ -56,10 +62,16 @@ public class ServerSelectorTest
}
@Test
public void testPick() throws Exception
public void testRun() throws Exception
{
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
SettableFuture futureException = SettableFuture.create();
SettableFuture<InputStream> futureResult = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureResult).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureException).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce();
EasyMock.replay(httpClient);
@ -74,18 +86,19 @@ public class ServerSelectorTest
new NoneShardSpec(),
0,
0L
)
),
new ConnectionCountServerSelectorStrategy()
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
new DefaultObjectMapper(),
httpClient,
"foo"
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
new DefaultObjectMapper(),
httpClient,
"foo2"
);
@ -103,11 +116,28 @@ public class ServerSelectorTest
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
client1.run(query);
client1.run(query);
client1.run(query);
Sequence s1 = client1.run(query);
Assert.assertEquals(1, client1.getNumOpenConnections());
Assert.assertTrue(client1.getNumOpenConnections() == 3);
// simulate read timeout
Sequence s2 = client1.run(query);
Assert.assertEquals(2, client1.getNumOpenConnections());
futureException.setException(new ReadTimeoutException());
Assert.assertEquals(1, client1.getNumOpenConnections());
// subsequent connections should work
Sequence s3 = client1.run(query);
Sequence s4 = client1.run(query);
Sequence s5 = client1.run(query);
Assert.assertTrue(client1.getNumOpenConnections() == 4);
// produce result for first connection
futureResult.set(new ByteArrayInputStream("[{\"timestamp\":\"2014-01-01T01:02:03Z\", \"result\": 42.0}]".getBytes()));
List<Result> results = Sequences.toList(s1, Lists.<Result>newArrayList());
Assert.assertEquals(1, results.size());
Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp());
Assert.assertEquals(3, client1.getNumOpenConnections());
client2.run(query);
client2.run(query);

View File

@ -78,7 +78,7 @@ public class RealtimeManagerTest
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
new FireDepartmentConfig(1, new Period("P1Y")),
new FireDepartmentConfig(1, new Period("P1Y"), 1),
new FirehoseFactory()
{
@Override

View File

@ -86,7 +86,7 @@ public class RealtimePlumberSchoolTest
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
IndexGranularity.HOUR
IndexGranularity.HOUR, 1
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);

View File

@ -37,8 +37,10 @@ public class ServerTimeRejectionPolicyFactoryTest
DateTime now = new DateTime();
DateTime past = now.minus(period).minus(1);
DateTime future = now.plus(period).plus(1);
Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(future.getMillis()));
}
}

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -30,6 +30,7 @@ import io.druid.client.TimelineServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheMonitor;
import io.druid.client.cache.CacheProvider;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
@ -53,7 +54,7 @@ import java.util.List;
*/
@Command(
name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.46/Broker.html for a description"
description = "Runs a broker node, see http://druid.io/docs/0.6.52/Broker.html for a description"
)
public class CliBroker extends ServerRunnable
{
@ -81,6 +82,8 @@ public class CliBroker extends ServerRunnable
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheProvider.class);
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);

View File

@ -47,9 +47,14 @@ import io.druid.server.http.BackwardsCompatibleInfoResource;
import io.druid.server.http.CoordinatorDynamicConfigsResource;
import io.druid.server.http.CoordinatorRedirectInfo;
import io.druid.server.http.CoordinatorResource;
import io.druid.server.http.DBResource;
import io.druid.server.http.DatasourcesResource;
import io.druid.server.http.InfoResource;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.http.RulesResource;
import io.druid.server.http.ServersResource;
import io.druid.server.http.TiersResource;
import io.druid.server.initialization.JettyServerInitializer;
import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
@ -60,7 +65,7 @@ import java.util.List;
*/
@Command(
name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.46/Coordinator.html for a description."
description = "Runs the Coordinator, see http://druid.io/docs/0.6.52/Coordinator.html for a description."
)
public class CliCoordinator extends ServerRunnable
{
@ -107,6 +112,11 @@ public class CliCoordinator extends ServerRunnable
Jerseys.addResource(binder, InfoResource.class);
Jerseys.addResource(binder, CoordinatorResource.class);
Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class);
Jerseys.addResource(binder, TiersResource.class);
Jerseys.addResource(binder, RulesResource.class);
Jerseys.addResource(binder, ServersResource.class);
Jerseys.addResource(binder, DatasourcesResource.class);
Jerseys.addResource(binder, DBResource.class);
LifecycleModule.register(binder, Server.class);
}

Some files were not shown because too many files have changed in this diff Show More