Merge branch 'master' into az

Conflicts:
	server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java
	server/src/main/java/io/druid/server/http/CoordinatorResource.java
	server/src/test/java/io/druid/client/DirectDruidClientTest.java
This commit is contained in:
fjy 2014-01-20 15:10:13 -08:00
commit e2ea2ec7fd
91 changed files with 1550 additions and 477 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
echo "See also http://druid.io/docs/0.6.46"
echo "See also http://druid.io/docs/0.6.52"

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -21,10 +21,15 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*/
@ -49,4 +54,31 @@ public class Execs
{
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
}
/**
* @param nameFormat nameformat for threadFactory
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
{
return new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(capacity), makeThreadFactory(nameFormat)
, new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
((ArrayBlockingQueue) executor.getQueue()).put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
}
}

View File

@ -0,0 +1,92 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.concurrent;
import com.google.common.base.Throwables;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
{
@Test
public void testBlockingExecutorService() throws Exception
{
final int capacity = 3;
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
final CountDownLatch taskStartSignal = new CountDownLatch(1);
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger();
ExecutorService producer = Executors.newSingleThreadExecutor();
producer.submit(
new Runnable()
{
public void run()
{
for (int i = 0; i < 2 * capacity; i++) {
final int taskID = i;
System.out.println("Produced task" + taskID);
blockingExecutor.submit(
new Runnable()
{
@Override
public void run()
{
System.out.println("Starting task" + taskID);
try {
taskStartSignal.await();
consumedCount.incrementAndGet();
taskCompletedSignal.countDown();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
System.out.println("Completed task" + taskID);
}
}
);
producedCount.incrementAndGet();
queueFullSignal.countDown();
}
}
}
);
queueFullSignal.await();
// verify that the producer blocks
Assert.assertEquals(capacity + 1, producedCount.get());
// let the tasks run
taskStartSignal.countDown();
// wait until all tasks complete
taskCompletedSignal.await();
// verify all tasks consumed
Assert.assertEquals(2 * capacity, consumedCount.get());
// cleanup
blockingExecutor.shutdown();
producer.shutdown();
}
}

View File

@ -0,0 +1,4 @@
---
layout: doc_page
---
Experimental features are features we have developed but have not fully tested in a production environment. If you choose to try them out, there will likely to edge cases that we have not covered. We would love feedback on any of these features, whether they are bug reports, suggestions for improvement, or letting us know they work as intended.

View File

@ -3,14 +3,14 @@ layout: doc_page
---
# Batch Data Ingestion
There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-service.html) or you can use the `HadoopDruidIndexer`.
There are two choices for batch data ingestion to your Druid cluster, you can use the [Indexing service](Indexing-Service.html) or you can use the `HadoopDruidIndexer`.
Which should I use?
-------------------
The [Indexing service](Indexing-service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.
The [Indexing service](Indexing-Service.html) is a node that can run as part of your Druid cluster and can accomplish a number of different types of indexing tasks. Even if all you care about is batch indexing, it provides for the encapsulation of things like the [database](MySQL.html) that is used for segment metadata and other things, so that your indexing tasks do not need to include such information. The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.
The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and dont want to spend the time configuring and deploying the [Indexing service](Indexing service.html) just yet.
The `HadoopDruidIndexer` runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and dont want to spend the time configuring and deploying the [Indexing service](Indexing-Service.html) just yet.
Batch Ingestion using the HadoopDruidIndexer
--------------------------------------------
@ -27,8 +27,8 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
{
"dataSource": "the_data_source",
"timestampSpec" : {
"timestampColumn": "ts",
"timestampFormat": "<iso, millis, posix, auto or any Joda time format>"
"column": "ts",
"format": "<iso, millis, posix, auto or any Joda time format>"
},
"dataSpec": {
"format": "<csv, tsv, or json>",
@ -188,8 +188,8 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
"config": {
"dataSource" : "example",
"timestampSpec" : {
"timestampColumn" : "timestamp",
"timestampFormat" : "auto"
"column" : "timestamp",
"format" : "auto"
},
"dataSpec" : {
"format" : "json",
@ -231,7 +231,7 @@ The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Co
|--------|-----------|---------|
|type|This should be "index_hadoop".|yes|
|config|A Hadoop Index Config (see above).|yes|
|hadoopCoordinates|The Maven <groupId>:<artifactId>:<version> of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
|hadoopCoordinates|The Maven `<groupId>:<artifactId>:<version>` of Hadoop to use. The default is "org.apache.hadoop:hadoop-core:1.0.3".|no|
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.

View File

@ -1,30 +1,80 @@
---
layout: doc_page
---
# Booting a Single Node Cluster #
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.46-bin.tar.gz).
# Booting a Druid Cluster
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. However, when it's time to run a more realistic setup&mdash;for production or just for testing production&mdash;you'll want to find a way to start the cluster on multiple hosts. This document describes two different ways to do this: manually, or as a cloud service via Apache Whirr.
The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables:
## Manually Booting a Druid Cluster
You can provision individual servers, loading Druid onto each machine (or building it) and setting the required configuration for each type of node. You'll also have to set up required external dependencies. Then you'll have to start each node. This process is outlined in [Tutorial: The Druid Cluster](Tutorial:-The-Druid-Cluster.html).
## Apache Whirr
[Apache Whirr](http://whirr.apache.org/) is a set of libraries for launching cloud services. For Druid, Whirr serves as an easy way to launch a cluster in Amazon AWS by using simple commands and configuration files (called *recipes*).
**NOTE:** Whirr will install Druid 0.5.x. At this time Whirr can launch Druid 0.5.x only, but in the near future will support Druid 0.6.x. You can download and install your own copy of Druid 0.5.x [here](http://static.druid.io/artifacts/releases/druid-services-0.5.7-bin.tar.gz).
You'll need an AWS account, and an EC2 key pair from that account so that Whirr can connect to the cloud via the EC2 API. If you haven't generated a key pair, see the [AWS documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) or see this [Whirr FAQ](http://whirr.apache.org/faq.html#how-do-i-find-my-cloud-credentials).
### Installing Whirr
You must use a version of Whirr that includes and supports a Druid recipe. You can do it so in one of two ways:
#### Build the Following Version of Whirr
Clone the code from [https://github.com/rjurney/whirr/tree/trunk](https://github.com/rjurney/whirr/tree/trunk) and build Whirr:
git clone git@github.com:rjurney/whirr.git
cd whirr
git checkout trunk
mvn clean install -Dmaven.test.failure.ignore=true
#### Build the Latest Version of Whirr
Clone the code from the Whirr repository:
git clone git://git.apache.org/whirr.git
Then run `mvn install` from the root directory.
### Configure Whirr
The Whirr recipe for Druid is the configuration file `$WHIRR_HOME/recipies/druid.properties`. You can edit this file to suit your needs -- it is annotated and self-explanatory. Here are some hints about that file:
* Set `whirr.location-id` to a specific AWS region (e.g., us-east-1) if desired, else one will be chosen for you.
* You can choose the hardware used with `whirr.hardware-id` to a specific AWS region (e.g., m1.large). If you don't choose an image via `whirr.image-id` (image must be compatible with hardware), you'll get plain vanilla Linux.
* SSH keys (not password protected) must exist for the local user. If they are in the default locations, `${sys:user.home}/.ssh/id_rsa` and `${sys:user.home}/.ssh/id_rsa.pub`, Whirr will find them. Otherwise, you'll have to specify them with `whirr.private-key-file` and `whirr.public-key-file`.
* Be sure to specify the absolute path of the Druid realtime spec file `realtime.spec` in `whirr.druid.realtime.spec.path`.
* Two Druid cluster templates (see `whirr.instance-templates`) are provided: a small cluster running on a single EC2 instance, and a larger cluster running on multiple instances. The first is a good test case to start with.
The following AWS information must be set in `druid.properties`, as environment variables, or in the file `$WHIRR_HOME/conf/credentials`:
PROVIDER=aws-ec2
IDENTITY=<aws-id-key>
CREDENTIAL=<aws-private-key>
How to get the IDENTITY and CREDENTIAL keys is discussed above.
### Start a Test Cluster With Whirr
Run the following command:
```bash
# Setup environment for ec2-api-tools
export EC2_HOME=/path/to/ec2-api-tools-1.6.7.4/
export PATH=$PATH:$EC2_HOME/bin
export AWS_ACCESS_KEY=
export AWS_SECRET_KEY=
% $WHIRR_HOME/bin/whirr launch-cluster --config $WHIRR_HOME/recipes/druid.properties
```
If Whirr starts without an errors, you should see the following message:
Then, booting an ec2 instance running one node of each type is as simple as running the script, run_ec2.sh :)
Running on provider aws-ec2 using identity <your-aws-id-here>
You can then use the EC2 dashboard to locate the instance and confirm that it has started up.
# Apache Whirr #
If both the instance and the Druid cluster launch successfully, a few minutes later other messages to STDOUT should follow with information returned from EC2, including the instance ID:
Apache Whirr is a set of libraries for launching cloud services. You can clone a version of Whirr that includes Druid as a service from git@github.com:rjurney/whirr.git:
Started cluster of 1 instances
Cluster{instances=[Instance{roles=[zookeeper, druid-mysql, druid-master, druid-broker, druid-compute, druid-realtime], publicIp= ...
The final message will contain login information for the instance.
Note that the Whirr will return an exception if any of the nodes fail to launch, and the cluster will be destroyed. To destroy the cluster manually, run the following command:
```bash
git clone git@github.com:rjurney/whirr.git
cd whirr
git checkout trunk
mvn clean install -Dmaven.test.failure.ignore=true -Dcheckstyle.skip
sp;bin/whirr launch-cluster --config recipes/druid.properties
% $WHIRR_HOME/bin/whirr destroy-cluster --config $WHIRR_HOME/recipes/druid.properties
```

View File

@ -37,7 +37,7 @@ The broker module uses several of the default modules in [Configuration](Configu
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.broker.cache.type`|`local`, `memcache`|The type of cache to use for queries.|`local`|
|`druid.broker.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to compute nodes. `random` choose randomly, `connectionCount` picks the node with the fewest number of active connections to|`random`|
#### Local Cache

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.46
git checkout druid-0.6.52
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -0,0 +1,59 @@
---
layout: doc_page
---
Druid supports filtering specially spatially indexed columns based on an origin and a bound.
# Spatial Indexing
In any of the data specs, there is the option of providing spatial dimensions. For example, for a JSON data spec, spatial dimensions can be specified as follows:
```json
"dataSpec" : {
"format": "JSON",
"dimensions": <some_dims>,
"spatialDimensions": [
{
"dimName": "coordinates",
"dims": ["lat", "long"]
},
...
]
}
```
|property|description|required?|
|--------|-----------|---------|
|dimName|The name of the spatial dimension. A spatial dimension may be constructed from multiple other dimensions or it may already exist as part of an event. If a spatial dimension already exists, it must be an array of coordinate values.|yes|
|dims|A list of dimension names that comprise a spatial dimension.|no|
# Spatial Filters
The grammar for a spatial filter is as follows:
```json
"filter" : {
"type": "spatial",
"dimension": "spatialDim",
"bound": {
"type": "rectangular",
"minCoords": [10.0, 20.0],
"maxCoords": [30.0, 40.0]
}
}
```
Bounds
------
### Rectangular
|property|description|required?|
|--------|-----------|---------|
|minCoords|List of minimum dimension coordinates for coordinates [x, y, z, …]|yes|
|maxCoords|List of maximum dimension coordinates for coordinates [x, y, z, …]|yes|
### Radius
|property|description|required?|
|--------|-----------|---------|
|coords|Origin coordinates in the form [x, y, z, …]|yes|
|radius|The float radius value|yes|

View File

@ -1,8 +1,7 @@
---
layout: doc_page
---
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query.
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. Note: If you only want to do straight aggreagates for some time range, we highly recommend using [TimeseriesQueries](TimeseriesQuery.html) instead. The performance will be substantially better.
An example groupBy query object is shown below:
``` json

View File

@ -28,11 +28,13 @@ druid.port=8081
druid.zk.service.host=localhost
druid.server.maxSize=100000000
druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=10000000
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]```
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
```
Note: This will spin up a Historical node with the local filesystem as deep storage.

View File

@ -0,0 +1,38 @@
---
layout: doc_page
---
## Where do my Druid segments end up after ingestion?
Depending on what `druid.storage.type` is set to, Druid will upload segments to some [Deep Storage](Deep-Storage.html). Local disk is used as the default deep storage.
## My realtime node is not handing segments off
Make sure that the `druid.publish.type` on your real-time nodes is set to "db". Also make sure that `druid.storage.type` is set to a deep storage that makes sense. Some example configs:
```
druid.publish.type=db
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.storage.type=s3
druid.storage.bucket=druid
druid.storage.baseKey=sample
```
## I don't see my Druid segments on my historical nodes
You can check the coordinator console located at <COORDINATOR_IP>:<PORT>/cluster.html. Make sure that your segments have actually loaded on [historical nodes](Historical.html). If your segments are not present, check the coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because historical nodes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example):
```
-Ddruid.segmentCache.locations=[{"path":"/tmp/druid/storageLocation","maxSize":"500000000000"}]
-Ddruid.server.maxSize=500000000000
```
## My queries are returning empty results
You can check <BROKER_IP>:<PORT>/druid/v2/datasources/<YOUR_DATASOURCE> for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists.
## More information
Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -1,7 +1,7 @@
---
layout: doc_page
---
The orderBy field provides the functionality to sort and limit the set of results from a groupBy query. Available options are:
The orderBy field provides the functionality to sort and limit the set of results from a groupBy query. If you group by a single dimension and are ordering by a single metric, we highly recommend using [TopN Queries](TopNQuery.html) instead. The performance will be substantially better. Available options are:
### DefaultLimitSpec

View File

@ -0,0 +1,18 @@
---
layout: doc_page
---
## What should I set my JVM heap?
The size of the JVM heap really depends on the type of Druid node you are running. Below are a few considerations.
[Broker nodes](Broker.html) can use the JVM heap as a query cache and thus, the size of the heap will affect on the number of results that can be cached. Broker nodes do not require off-heap memory and generally, heap sizes can be set to be close to the maximum memory on the machine (leaving some room for JVM overhead). The heap is used to merge results from different real-time and historical nodes, along with other computational processing.
[Historical nodes](Historical.html) use off-heap memory to store intermediate results, and by default, all segments are memory mapped before they can be queried. The more off-heap memory is available, the more segments can be served without the possibility of data being paged onto disk. On historicals, the JVM heap is used for [GroupBy queries](GroupByQuery.html), some data structures used for intermediate computation, and general processing.
[Coordinator nodes](Coordinator nodes) do not require off-heap memory and the heap is used for loading information about all segments to determine what segments need to be loaded, dropped, moved, or replicated.
## What is the intermediate computation buffer?
The intermediate computation buffer specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed. The default size is 1073741824 bytes (1GB).
## What is server maxSize?
Server maxSize sets the maximum cumulative segment size (in bytes) that a node can hold. Changing this parameter will affect performance by controlling the memory/disk ratio on a node. Setting this parameter to a value greater than the total memory capacity on a node and may cause disk paging to occur. This paging time introduces a query latency delay.

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.52"]
druid.zk.service.host=localhost
@ -42,7 +42,7 @@ druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=10000000
druid.processing.buffer.sizeBytes=100000000
```
The realtime module also uses several of the default modules in [Configuration](Configuration.html). For more information on the realtime spec file (or configuration file), see [realtime ingestion](Realtime-ingestion.html) page.

View File

@ -1,35 +0,0 @@
---
layout: doc_page
---
Note: This feature is highly experimental and only works with spatially indexed dimensions.
The grammar for a spatial filter is as follows:
<code>
{
"dimension": "spatialDim",
"bound": {
"type": "rectangular",
"minCoords": [10.0, 20.0],
"maxCoords": [30.0, 40.0]
}
}
</code>
Bounds
------
### Rectangular
|property|description|required?|
|--------|-----------|---------|
|minCoords|List of minimum dimension coordinates for coordinates [x, y, z, …]|yes|
|maxCoords|List of maximum dimension coordinates for coordinates [x, y, z, …]|yes|
### Radius
|property|description|required?|
|--------|-----------|---------|
|coords|Origin coordinates in the form [x, y, z, …]|yes|
|radius|The float radius value|yes|

View File

@ -1,26 +0,0 @@
---
layout: doc_page
---
Note: This feature is highly experimental.
In any of the data specs, there is now the option of providing spatial dimensions. For example, for a JSON data spec, spatial dimensions can be specified as follows:
<code>
{
"type": "JSON",
"dimensions": <some_dims>,
"spatialDimensions": [
{
"dimName": "coordinates",
"dims": ["lat", "long"]
},
...
]
}
</code>
|property|description|required?|
|--------|-----------|---------|
|dimName|The name of the spatial dimension. A spatial dimension may be constructed from multiple other dimensions or it may already exist as part of an event. If a spatial dimension already exists, it must be an array of dimension values.|yes|
|dims|A list of dimension names that comprise a spatial dimension.|no|

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.46
cd druid-services-0.6.52
```
You should see a bunch of files:

View File

@ -194,6 +194,12 @@ Which gets us metrics about only those edits where the namespace is 'article':
Check out [Filters](Filters.html) for more information.
What Types of Queries to Use
----------------------------
The types of query you should use depends on your use case. [TimeBoundary queries](TimeBoundaryQuery.html) are useful to understand the range of your data. [Timeseries queries](TimeseriesQuery.html) are useful for aggregates and filters over a time range, and offer significant speed improvements over [GroupBy queries](GroupByQuery.html). To find the top values for a given dimension, [TopN queries](TopNQuery.html) should be used over group by queries as well.
## Learn More ##
You can learn more about querying at [Querying](Querying.html)! If you are ready to evaluate Druid more in depth, check out [Booting a production cluster](Booting-a-production-cluster.html)!

View File

@ -269,6 +269,8 @@ Next Steps
This tutorial covered ingesting a small batch data set and loading it into Druid. In [Loading Your Data Part 2](Tutorial%3A-Loading-Your-Data-Part-2.html), we will cover how to ingest data using Hadoop for larger data sets.
Note: The index task and local firehose can be used to ingest your own data if the size of that data is relatively small (< 1G). The index task is fairly slow and we highly recommend using the Hadoop Index Task for ingesting larger quantities of data.
Additional Information
----------------------

View File

@ -45,7 +45,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
<a id="set-up-kafka"></a>
#### Setting up Kafka
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.46/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.52/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).
@ -264,8 +264,10 @@ Examining the contents of the file, you should find:
"type" : "index_hadoop",
"config": {
"dataSource" : "wikipedia",
"timestampColumn" : "timestamp",
"timestampFormat" : "auto",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dataSpec" : {
"format" : "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
@ -303,7 +305,8 @@ Examining the contents of the file, you should find:
}
```
If you are curious about what all this configuration means, see [here](Task.html)
If you are curious about what all this configuration means, see [here](Task.html).
To submit the task:
```bash

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz)
and untar the contents within by issuing:
@ -149,17 +149,19 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.52"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.server.maxSize=100000000
druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=10000000
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
```
To start the historical node:
@ -238,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.52","io.druid.extensions:druid-kafka-seven:0.6.52"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@ -248,7 +250,7 @@ druid.publish.type=noop
# druid.db.connector.user=druid
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=10000000
druid.processing.buffer.sizeBytes=100000000
```
Next Steps

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.46
cd druid-services-0.6.52
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz.
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.52-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -19,6 +19,7 @@ h2. Operations
* "Extending Druid":./Modules.html
* "Cluster Setup":./Cluster-setup.html
* "Booting a Production Cluster":./Booting-a-production-cluster.html
* "Performance FAQ":./Performance-FAQ.html
h2. Data Ingestion
* "Realtime":./Realtime-ingestion.html
@ -27,6 +28,7 @@ h2. Data Ingestion
* "Batch":./Batch-ingestion.html
* "Indexing Service":./Indexing-Service.html
** "Tasks":./Tasks.html
* "Ingestion FAQ":./Ingestion-FAQ.html
h2. Querying
* "Querying":./Querying.html
@ -64,6 +66,10 @@ h2. Architecture
** "MySQL":./MySQL.html
** "ZooKeeper":./ZooKeeper.html
h2. Experimental
* "About Experimental Features":./About-Experimental-Features.html
* "Geographic Queries":./GeographicQueries.html
h2. Development
* "Versioning":./Versioning.html
* "Build From Source":./Build-from-source.html

View File

@ -1,7 +1,9 @@
{
"dataSource": "wikipedia",
"timestampColumn": "timestamp",
"timestampFormat": "iso",
"timestampSpec" : {
"column": "timestamp",
"format": "iso",
},
"dataSpec": {
"format": "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]

View File

@ -2,8 +2,10 @@
"type" : "index_hadoop",
"config": {
"dataSource" : "wikipedia",
"timestampColumn" : "timestamp",
"timestampFormat" : "auto",
"timestampSpec" : {
"column": "timestamp",
"format": "auto"
},
"dataSpec" : {
"format" : "json",
"dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]

View File

@ -1,76 +0,0 @@
# Before running, you will need to download the EC2 tools from http://aws.amazon.com/developertools/351
# and then setup your EC2_HOME and PATH variables (or similar):
#
# # Setup environment for ec2-api-tools
# export EC2_HOME=/path/to/ec2-api-tools-1.6.7.4/
# export PATH=$PATH:$EC2_HOME/bin
# export AWS_ACCESS_KEY=
# export AWS_SECRET_KEY=
# Check for ec2 commands we require and die if they're missing
type ec2-create-keypair >/dev/null 2>&1 || { echo >&2 "I require ec2-create-keypair but it's not installed. Aborting."; exit 1; }
type ec2-create-group >/dev/null 2>&1 || { echo >&2 "I require ec2-create-group but it's not installed. Aborting."; exit 1; }
type ec2-authorize >/dev/null 2>&1 || { echo >&2 "I require ec2-authorize but it's not installed. Aborting."; exit 1; }
type ec2-run-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-run-instances but it's not installed. Aborting."; exit 1; }
type ec2-describe-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-describe-instances but it's not installed. Aborting."; exit 1; }
# Create a keypair for our servers
echo "Removing old keypair for druid..."
ec2-delete-keypair druid-keypair
echo "Creating new keypair for druid..."
ec2-create-keypair druid-keypair > druid-keypair
chmod 0600 druid-keypair
mv druid-keypair ~/.ssh/
# Create a security group for our servers
echo "Creating a new security group for druid..."
ec2-create-group druid-group -d "Druid Cluster"
# Create rules that allow necessary services in our group
echo "Creating new firewall rules for druid..."
# SSH from outside
ec2-authorize druid-group -P tcp -p 22
# Enable all traffic within group
ec2-authorize druid-group -P tcp -p 1-65535 -o druid-group
ec2-authorize druid-group -P udp -p 1-65535 -o druid-group
echo "Booting a single small instance for druid..."
# Use ami ami-e7582d8e - Alestic Ubuntu 12.04 us-east
INSTANCE_ID=$(ec2-run-instances ami-e7582d8e -n 1 -g druid-group -k druid-keypair --instance-type m1.small| awk '/INSTANCE/{print $2}')
while true; do
sleep 1
INSTANCE_STATUS=$(ec2-describe-instances|grep INSTANCE|grep $INSTANCE_ID|cut -f6)
if [ $INSTANCE_STATUS == "running" ]
then
echo "Instance $INSTANCE_ID is status $INSTANCE_STATUS..."
break
fi
done
# Wait for the instance to come up
echo "Waiting 60 seconds for instance $INSTANCE_ID to boot..."
sleep 60
# Get hostname and ssh with the key we created, and ssh there
INSTANCE_ADDRESS=`ec2-describe-instances|grep 'INSTANCE'|grep $INSTANCE_ID|cut -f4`
echo "Connecting to $INSTANCE_ADDRESS to prepare environment for druid..."
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/env.sh ubuntu@${INSTANCE_ADDRESS}:
ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh'
echo "Prepared $INSTANCE_ADDRESS for druid."
# Now to scp a tarball up that can run druid!
if [ -f ../../services/target/druid-services-*-bin.tar.gz ];
then
echo "Uploading druid tarball to server..."
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ../../services/target/druid-services-*-bin.tar.gz ubuntu@${INSTANCE_ADDRESS}:
else
echo "ERROR - package not built!"
fi
# Now boot druid parts
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/run.sh ubuntu@${INSTANCE_ADDRESS}:
ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh'
echo "Druid booting complete!"
echo "ssh -i ~/.ssh/druid-keypair ubuntu@${INSTANCE_ADDRESS} #to connect"

View File

@ -4,16 +4,16 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.52"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
druid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ
druid.server.maxSize=100000000
druid.server.maxSize=10000000000
# Change these to make Druid faster
druid.processing.buffer.sizeBytes=10000000
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 100000000}]
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46","io.druid.extensions:druid-rabbitmq:0.6.46"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.52","io.druid.extensions:druid-kafka-seven:0.6.52","io.druid.extensions:druid-rabbitmq:0.6.52"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@ -14,4 +14,4 @@ druid.publish.type=noop
# druid.db.connector.user=druid
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=10000000
druid.processing.buffer.sizeBytes=100000000

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -19,11 +19,9 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
@ -39,8 +37,6 @@ public class DbUpdaterJob implements Jobby
{
private static final Logger log = new Logger(DbUpdaterJob.class);
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final HadoopDruidIndexerConfig config;
private final IDBI dbi;
@ -82,7 +78,7 @@ public class DbUpdaterJob implements Jobby
.put("partitioned", segment.getShardSpec().getPartitionNum())
.put("version", segment.getVersion())
.put("used", true)
.put("payload", jsonMapper.writeValueAsString(segment))
.put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))
.build()
);

View File

@ -443,7 +443,11 @@ public class DeterminePartitionsJob implements Jobby
final int index = bytes.getInt();
if (index >= numPartitions) {
throw new ISE("Not enough partitions, index[%,d] >= numPartitions[%,d]", index, numPartitions);
throw new ISE(
"Not enough partitions, index[%,d] >= numPartitions[%,d]. Please increase the number of reducers to the index size or check your config & settings!",
index,
numPartitions
);
}
return index;
@ -453,7 +457,6 @@ public class DeterminePartitionsJob implements Jobby
private static abstract class DeterminePartitionsDimSelectionBaseReducer
extends Reducer<BytesWritable, Text, BytesWritable, Text>
{
protected static volatile HadoopDruidIndexerConfig config = null;
@Override

View File

@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -45,22 +46,25 @@ public class ArbitraryGranularitySpec implements GranularitySpec
intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
// Insert all intervals
for(final Interval inputInterval : inputIntervals) {
for (final Interval inputInterval : inputIntervals) {
intervals.add(inputInterval);
}
// Ensure intervals are non-overlapping (but they may abut each other)
final PeekingIterator<Interval> intervalIterator = Iterators.peekingIterator(intervals.iterator());
while(intervalIterator.hasNext()) {
while (intervalIterator.hasNext()) {
final Interval currentInterval = intervalIterator.next();
if(intervalIterator.hasNext()) {
if (intervalIterator.hasNext()) {
final Interval nextInterval = intervalIterator.peek();
if(currentInterval.overlaps(nextInterval)) {
throw new IllegalArgumentException(String.format(
"Overlapping intervals: %s, %s",
currentInterval,
nextInterval));
if (currentInterval.overlaps(nextInterval)) {
throw new IllegalArgumentException(
String.format(
"Overlapping intervals: %s, %s",
currentInterval,
nextInterval
)
);
}
}
}
@ -79,10 +83,16 @@ public class ArbitraryGranularitySpec implements GranularitySpec
// First interval with start time dt
final Interval interval = intervals.floor(new Interval(dt, new DateTime(Long.MAX_VALUE)));
if(interval != null && interval.contains(dt)) {
if (interval != null && interval.contains(dt)) {
return Optional.of(interval);
} else {
return Optional.absent();
}
}
@Override
public Granularity getGranularity()
{
throw new UnsupportedOperationException();
}
}

View File

@ -22,6 +22,7 @@ package io.druid.indexer.granularity;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import com.metamx.common.Granularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -43,4 +44,6 @@ public interface GranularitySpec
/** Time-grouping interval corresponding to some instant, if any. */
public Optional<Interval> bucketInterval(DateTime dt);
public Granularity getGranularity();
}

View File

@ -67,6 +67,7 @@ public class UniformGranularitySpec implements GranularitySpec
return wrappedSpec.bucketInterval(dt);
}
@Override
@JsonProperty("gran")
public Granularity getGranularity()
{

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -22,15 +22,17 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Sets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -61,6 +63,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArrayList;
public class IndexTask extends AbstractFixedIntervalTask
@ -133,7 +136,13 @@ public class IndexTask extends AbstractFixedIntervalTask
{
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet();
for (final Interval bucket : granularitySpec.bucketIntervals()) {
final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals(), getDataIntervals());
if (validIntervals.isEmpty()) {
throw new ISE("No valid data intervals found. Check your configs!");
}
for (final Interval bucket : validIntervals) {
final List<ShardSpec> shardSpecs;
if (targetPartitionSize > 0) {
shardSpecs = determinePartitions(bucket, targetPartitionSize);
@ -160,6 +169,19 @@ public class IndexTask extends AbstractFixedIntervalTask
return TaskStatus.success(getId());
}
private SortedSet<Interval> getDataIntervals() throws IOException
{
SortedSet<Interval> retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
try (Firehose firehose = firehoseFactory.connect()) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
Interval interval = granularitySpec.getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
retVal.add(interval);
}
}
return retVal;
}
private List<ShardSpec> determinePartitions(
final Interval interval,
final int targetPartitionSize

View File

@ -194,7 +194,7 @@ public class RealtimeIndexTask extends AbstractTask
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
windowPeriod,
new File(toolbox.getTaskWorkDir(), "persist"),
segmentGranularity
segmentGranularity, fireDepartmentConfig.getMaxPendingPersists()
);
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);

View File

@ -43,7 +43,6 @@ import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
@ -98,7 +97,7 @@ public class DbTaskStorage implements TaskStorage
}
@Override
public void insert(final Task task, final TaskStatus status)
public void insert(final Task task, final TaskStatus status) throws TaskExistsException
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
@ -137,9 +136,11 @@ public class DbTaskStorage implements TaskStorage
}
);
}
catch (StatementException e) {
// Might be a duplicate task ID.
if (getTask(task.getId()).isPresent()) {
catch (Exception e) {
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getTask(task.getId()).isPresent()) {
throw new TaskExistsException(task.getId(), e);
} else {
throw e;
@ -533,11 +534,8 @@ public class DbTaskStorage implements TaskStorage
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
}
catch (RuntimeException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
throw new CallbackFailedException(e);
throw Throwables.propagate(e);
}
}

View File

@ -63,7 +63,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
public void insert(Task task, TaskStatus status)
public void insert(Task task, TaskStatus status) throws TaskExistsException
{
giant.lock();

View File

@ -61,6 +61,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
@ -335,7 +336,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
pendingTaskPayloads.remove(taskId);
log.info("Removed task from pending queue: %s", taskId);
} else if (completeTasks.containsKey(taskId)) {
cleanup(completeTasks.get(taskId).getWorker().getHost(), taskId);
cleanup(taskId);
} else {
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
@ -469,28 +470,32 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
/**
* Removes a task from the complete queue and clears out the ZK status path of the task.
*
* @param workerId - the worker that was previously running the task
* @param taskId - the task to cleanup
*/
private void cleanup(final String workerId, final String taskId)
private void cleanup(final String taskId)
{
if (!started) {
return;
}
if (completeTasks.remove(taskId) == null) {
final RemoteTaskRunnerWorkItem removed = completeTasks.remove(taskId);
final Worker worker = removed.getWorker();
if (removed == null || worker == null) {
log.makeAlert("WTF?! Asked to cleanup nonexistent task")
.addData("workerId", workerId)
.addData("taskId", taskId)
.emit();
} else {
final String workerId = worker.getHost();
log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId);
final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId);
try {
cf.delete().guaranteed().forPath(statusPath);
}
catch (Exception e) {
catch (KeeperException.NoNodeException e) {
log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", statusPath);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
@ -593,7 +598,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
elapsed,
config.getTaskAssignmentTimeout()
);
taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
break;
}
@ -666,7 +670,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
SettableFuture.<TaskStatus>create(),
zkWorker.getWorker()
);
runningTasks.put(taskId, taskRunnerWorkItem);
runningTasks.put(taskId, taskRunnerWorkItem.withWorker(zkWorker.getWorker()));
}
if (taskStatus.isComplete()) {

View File

@ -19,7 +19,7 @@
package io.druid.indexing.overlord;
public class TaskExistsException extends RuntimeException
public class TaskExistsException extends Exception
{
private final String taskId;

View File

@ -296,7 +296,7 @@ public class TaskQueue
*
* @return true
*/
public boolean add(final Task task)
public boolean add(final Task task) throws TaskExistsException
{
giant.lock();
@ -306,15 +306,8 @@ public class TaskQueue
Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize());
// If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue.
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
}
catch (TaskExistsException e) {
log.warn("Attempt to add task twice: %s", task.getId());
throw Throwables.propagate(e);
}
// insert the task into our queue. So don't catch it.
taskStorage.insert(task, TaskStatus.running(task.getId()));
tasks.add(task);
managementMayBeNecessary.signalAll();
return true;

View File

@ -30,10 +30,11 @@ import java.util.List;
public interface TaskStorage
{
/**
* Adds a task to the storage facility with a particular status. If the task ID already exists, this method
* will throw a {@link TaskExistsException}.
* Adds a task to the storage facility with a particular status.
*
* @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists
*/
public void insert(Task task, TaskStatus status);
public void insert(Task task, TaskStatus status) throws TaskExistsException;
/**
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle

View File

@ -36,6 +36,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskExistsException;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
@ -45,6 +46,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.apache.zookeeper.data.Stat;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes;
@ -126,8 +128,15 @@ public class OverlordResource
@Override
public Response apply(TaskQueue taskQueue)
{
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
try {
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (TaskExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId())))
.build();
}
}
}
);

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

10
pom.xml
View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
@ -39,7 +39,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.1</metamx.java-util.version>
<metamx.java-util.version>0.25.2</metamx.java-util.version>
<apache.curator.version>2.3.0</apache.curator.version>
<druid.api.version>0.1.7</druid.api.version>
</properties>
@ -73,7 +73,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.2.7</version>
<version>0.2.9</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -143,6 +143,10 @@
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -490,8 +490,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn);
}
for (Object column : complexColumnCache.values()) {
if (column instanceof Closeable) {
for (Object column : objectColumnCache.values()) {
if(column instanceof Closeable) {
Closeables.closeQuietly((Closeable) column);
}
}

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -103,7 +103,7 @@ public class HttpClientModule implements Module
private int numConnections = 5;
@JsonProperty
private Period readTimeout = new Period("PT5M");
private Period readTimeout = new Period("PT15M");
public int getNumConnections()
{

View File

@ -28,17 +28,23 @@ import org.joda.time.Period;
*/
public class FireDepartmentConfig
{
private static int MAX_PENDING_PERSIST_BATCHES_DEFAULT = 2;
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
private final int maxPendingPersists;
@JsonCreator
public FireDepartmentConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
this.maxRowsInMemory = maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod;
this.maxPendingPersists = maxPendingPersists > 0
? maxPendingPersists
: MAX_PENDING_PERSIST_BATCHES_DEFAULT;
Preconditions.checkArgument(maxRowsInMemory > 0, "maxRowsInMemory[%s] should be greater than 0", maxRowsInMemory);
Preconditions.checkNotNull(intermediatePersistPeriod, "intermediatePersistPeriod");
@ -55,4 +61,10 @@ public class FireDepartmentConfig
{
return intermediatePersistPeriod;
}
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
}

View File

@ -31,7 +31,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
@ -42,6 +41,7 @@ import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Processing;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
@ -84,7 +84,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -92,43 +91,34 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumberSchool implements PlumberSchool
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
private final int maxPendingPersists;
private volatile boolean shuttingDown = false;
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentPusher dataSegmentPusher = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
private volatile SegmentPublisher segmentPublisher = null;
@JacksonInject
@NotNull
private volatile ServerView serverView = null;
@JacksonInject
@NotNull
@Processing
private volatile ExecutorService queryExecutorService = null;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@ -136,7 +126,8 @@ public class RealtimePlumberSchool implements PlumberSchool
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
this.windowPeriod = windowPeriod;
@ -144,7 +135,9 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersists = maxPendingPersists;
Preconditions.checkArgument(maxPendingPersists > 0);
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
@ -485,22 +478,13 @@ public class RealtimePlumberSchool implements PlumberSchool
private void initializeExecutors()
{
if (persistExecutor == null) {
persistExecutor = Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_persist_%d")
.build()
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.newBlockingSingleThreaded(
"plumber_persist_%d", maxPendingPersists
);
}
if (scheduledExecutor == null) {
scheduledExecutor = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_scheduled_%d")
.build()
);
scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
}
}
@ -697,7 +681,8 @@ public class RealtimePlumberSchool implements PlumberSchool
/**
* Unannounces a given sink and removes all local references to it.
*/
private void abandonSegment(final long truncatedTime, final Sink sink) {
private void abandonSegment(final long truncatedTime, final Sink sink)
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));

View File

@ -40,7 +40,12 @@ public class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory
@Override
public boolean accept(long timestamp)
{
return timestamp >= (System.currentTimeMillis() - windowMillis);
long now = System.currentTimeMillis();
boolean notTooOld = timestamp >= (now - windowMillis);
boolean notTooYoung = timestamp <= (now + windowMillis);
return notTooOld && notTooYoung;
}
@Override

View File

@ -128,6 +128,7 @@ public class QueryResource
.setUser4(query.getType())
.setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);

View File

@ -44,6 +44,7 @@ import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.collections.CountingMap;
import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
@ -60,6 +61,8 @@ import io.druid.server.coordinator.helper.DruidCoordinatorLogger;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
@ -78,6 +81,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -189,6 +193,51 @@ public class DruidCoordinator
return leader;
}
public Map<String, LoadQueuePeon> getLoadManagementPeons()
{
return loadManagementPeons;
}
public Map<String, Double> getReplicationStatus()
{
// find expected load per datasource
final CountingMap<String> expectedSegmentsInCluster = new CountingMap<>();
final DateTime now = new DateTime();
for (DataSegment segment : getAvailableDataSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
for (Rule rule : rules) {
if (rule instanceof LoadRule && rule.appliesTo(segment, now)) {
for (Integer numReplicants : ((LoadRule) rule).getTieredReplicants().values()) {
expectedSegmentsInCluster.add(segment.getDataSource(), numReplicants);
}
break;
}
}
}
// find segments currently loaded per datasource
Map<String, Integer> segmentsInCluster = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
for (DataSegment segment : druidServer.getSegments().values()) {
Integer count = segmentsInCluster.get(segment.getDataSource());
if (count == null) {
count = 0;
}
segmentsInCluster.put(segment.getDataSource(), count + 1);
}
}
// compare available segments with currently loaded
Map<String, Double> loadStatus = Maps.newHashMap();
for (Map.Entry<String, AtomicLong> entry : expectedSegmentsInCluster.entrySet()) {
Integer actual = segmentsInCluster.get(entry.getKey());
loadStatus.put(entry.getKey(), 100 * (actual == null ? 0.0D : (double) actual) / entry.getValue().get());
}
return loadStatus;
}
public Map<String, Double> getLoadStatus()
{
// find available segments
@ -766,11 +815,11 @@ public class DruidCoordinator
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
// Stop peons for servers that aren't there anymore.
final Set<String> disdappearedServers = Sets.newHashSet(loadManagementPeons.keySet());
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (DruidServer server : servers) {
disdappearedServers.remove(server.getName());
disappeared.remove(server.getName());
}
for (String name : disdappearedServers) {
for (String name : disappeared) {
log.info("Removing listener for server[%s] which is no longer there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();

View File

@ -19,6 +19,7 @@
package io.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
@ -103,6 +104,7 @@ public class LoadQueuePeon
this.config = config;
}
@JsonProperty
public Set<DataSegment> getSegmentsToLoad()
{
return new ConcurrentSkipListSet<DataSegment>(
@ -120,6 +122,7 @@ public class LoadQueuePeon
);
}
@JsonProperty
public Set<DataSegment> getSegmentsToDrop()
{
return new ConcurrentSkipListSet<DataSegment>(

View File

@ -32,7 +32,7 @@ import javax.ws.rs.core.Response;
/**
*/
@Path("/coordinator/config")
@Path("/druid/coordinator/v1/config")
public class CoordinatorDynamicConfigsResource
{
private final JacksonConfigManager manager;

View File

@ -19,21 +19,23 @@
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.timeline.DataSegment;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.List;
/**
*/
@Path("/coordinator")
@Path("/druid/coordinator/v1")
public class CoordinatorResource
{
private final DruidCoordinator coordinator;
@ -46,74 +48,64 @@ public class CoordinatorResource
this.coordinator = coordinator;
}
@POST
@Path("/move")
@Consumes("application/json")
public Response moveSegment(List<SegmentToMove> segmentsToMove)
@GET
@Path("/leader")
@Produces("application/json")
public Response getLeader()
{
Response resp = Response.status(Response.Status.OK).build();
for (SegmentToMove segmentToMove : segmentsToMove) {
try {
coordinator.moveSegment(
segmentToMove.getFromServer(),
segmentToMove.getToServer(),
segmentToMove.getSegmentName(),
new LoadPeonCallback()
{
@Override
public void execute()
{
return;
}
}
);
}
catch (Exception e) {
resp = Response
.status(Response.Status.BAD_REQUEST)
.entity(e.getMessage())
.build();
break;
}
}
return resp;
}
@POST
@Path("/drop")
@Consumes("application/json")
public Response dropSegment(List<SegmentToDrop> segmentsToDrop)
{
Response resp = Response.status(Response.Status.OK).build();
for (SegmentToDrop segmentToDrop : segmentsToDrop) {
try {
coordinator.dropSegment(
segmentToDrop.getFromServer(), segmentToDrop.getSegmentName(), new LoadPeonCallback()
{
@Override
public void execute()
{
return;
}
}
);
}
catch (Exception e) {
resp = Response
.status(Response.Status.BAD_REQUEST)
.entity(e.getMessage())
.build();
break;
}
}
return resp;
return Response.ok(coordinator.getCurrentLeader()).build();
}
@GET
@Path("/loadstatus")
@Produces("application/json")
public Response getLoadStatus()
public Response getLoadStatus(
@QueryParam("full") String full
)
{
if (full != null) {
return Response.ok(coordinator.getReplicationStatus()).build();
}
return Response.ok(coordinator.getLoadStatus()).build();
}
@GET
@Path("loadqueue")
@Produces("application/json")
public Response getLoadQueue(
@QueryParam("simple") String simple
)
{
if (simple != null) {
return Response.ok(
Maps.transformValues(
coordinator.getLoadManagementPeons(),
new Function<LoadQueuePeon, Object>()
{
@Override
public Object apply(LoadQueuePeon input)
{
long loadSize = 0;
for (DataSegment dataSegment : input.getSegmentsToLoad()) {
loadSize += dataSegment.getSize();
}
long dropSize = 0;
for (DataSegment dataSegment : input.getSegmentsToDrop()) {
dropSize += dataSegment.getSize();
}
return new ImmutableMap.Builder<>()
.put("segmentsToLoad", input.getSegmentsToLoad().size())
.put("segmentsToDrop", input.getSegmentsToDrop().size())
.put("segmentsToLoadSize", loadSize)
.put("segmentsToDropSize", dropSize)
.build();
}
}
)
).build();
}
return Response.ok(coordinator.getLoadManagementPeons()).build();
}
}

View File

@ -0,0 +1,159 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import io.druid.client.DruidDataSource;
import io.druid.db.DatabaseSegmentManager;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.List;
/**
*/
@Path("/druid/coordinator/v1/db")
public class DBResource
{
private final DatabaseSegmentManager databaseSegmentManager;
@Inject
public DBResource(
DatabaseSegmentManager databaseSegmentManager
)
{
this.databaseSegmentManager = databaseSegmentManager;
}
@GET
@Path("/datasources")
@Produces("application/json")
public Response getDatabaseDataSources(
@QueryParam("full") String full,
@QueryParam("includeDisabled") String includeDisabled
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (includeDisabled != null) {
return builder.entity(databaseSegmentManager.getAllDatasourceNames()).build();
}
if (full != null) {
return builder.entity(databaseSegmentManager.getInventory()).build();
}
List<String> dataSourceNames = Lists.newArrayList(
Iterables.transform(
databaseSegmentManager.getInventory(),
new Function<DruidDataSource, String>()
{
@Override
public String apply(@Nullable DruidDataSource dataSource)
{
return dataSource.getName();
}
}
)
);
Collections.sort(dataSourceNames);
return builder.entity(dataSourceNames).build();
}
@GET
@Path("/datasources/{dataSourceName}")
@Produces("application/json")
public Response getDatabaseSegmentDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).entity(dataSource).build();
}
@GET
@Path("/datasources/{dataSourceName}/segments")
@Produces("application/json")
public Response getDatabaseSegmentDataSourceSegments(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("full") String full
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(dataSource.getSegments()).build();
}
return builder.entity(
Iterables.transform(
dataSource.getSegments(),
new Function<DataSegment, Object>()
{
@Override
public Object apply(@Nullable DataSegment segment)
{
return segment.getIdentifier();
}
}
)
).build();
}
@GET
@Path("/datasources/{dataSourceName}/segments/{segmentId}")
@Produces("application/json")
public Response getDatabaseSegmentDataSourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
for (DataSegment segment : dataSource.getSegments()) {
if (segment.getIdentifier().equalsIgnoreCase(segmentId)) {
return Response.status(Response.Status.OK).entity(segment).build();
}
}
return Response.status(Response.Status.NOT_FOUND).build();
}
}

View File

@ -0,0 +1,325 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.DatabaseSegmentManager;
import io.druid.segment.IndexGranularity;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
*/
@Path("/druid/coordinator/v1/datasources")
public class DatasourcesResource
{
private static Map<String, Object> makeSimpleDatasource(DruidDataSource input)
{
return new ImmutableMap.Builder<String, Object>()
.put("name", input.getName())
.put("properties", input.getProperties())
.build();
}
private final InventoryView serverInventoryView;
private final DatabaseSegmentManager databaseSegmentManager;
private final IndexingServiceClient indexingServiceClient;
@Inject
public DatasourcesResource(
InventoryView serverInventoryView,
DatabaseSegmentManager databaseSegmentManager,
@Nullable IndexingServiceClient indexingServiceClient
)
{
this.serverInventoryView = serverInventoryView;
this.databaseSegmentManager = databaseSegmentManager;
this.indexingServiceClient = indexingServiceClient;
}
@GET
@Produces("application/json")
public Response getQueryableDataSources(
@QueryParam("full") String full,
@QueryParam("simple") String simple,
@QueryParam("gran") String gran
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(getDataSources()).build();
} else if (simple != null) {
return builder.entity(
Lists.newArrayList(
Iterables.transform(
getDataSources(),
new Function<DruidDataSource, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(DruidDataSource dataSource)
{
return makeSimpleDatasource(dataSource);
}
}
)
)
).build();
} else if (gran != null) {
IndexGranularity granularity = IndexGranularity.fromString(gran);
// TODO
}
return builder.entity(
Lists.newArrayList(
Iterables.transform(
getDataSources(),
new Function<DruidDataSource, String>()
{
@Override
public String apply(DruidDataSource dataSource)
{
return dataSource.getName();
}
}
)
)
).build();
}
@DELETE
@Path("/{dataSourceName}")
public Response deleteDataSource(
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("kill") final String kill,
@QueryParam("interval") final String interval
)
{
if (indexingServiceClient == null) {
return Response.status(Response.Status.OK).entity(ImmutableMap.of("error", "no indexing service found")).build();
}
if (kill != null && Boolean.valueOf(kill)) {
indexingServiceClient.killSegments(dataSourceName, new Interval(interval));
} else {
if (!databaseSegmentManager.removeDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
return Response.status(Response.Status.OK).build();
}
@POST
@Path("/{dataSourceName}")
@Consumes("application/json")
public Response enableDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
if (!databaseSegmentManager.enableDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).build();
}
@GET
@Path("/{dataSourceName}/segments")
@Produces("application/json")
public Response getSegmentDataSourceSegments(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("full") String full
)
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(dataSource.getSegments()).build();
}
return builder.entity(
Iterables.transform(
dataSource.getSegments(),
new Function<DataSegment, Object>()
{
@Override
public Object apply(@Nullable DataSegment segment)
{
return segment.getIdentifier();
}
}
)
).build();
}
@GET
@Path("/{dataSourceName}/segments/{segmentId}")
@Produces("application/json")
public Response getSegmentDataSourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
for (DataSegment segment : dataSource.getSegments()) {
if (segment.getIdentifier().equalsIgnoreCase(segmentId)) {
return Response.status(Response.Status.OK).entity(segment).build();
}
}
return Response.status(Response.Status.NOT_FOUND).build();
}
@DELETE
@Path("/{dataSourceName}/segments/{segmentId}")
public Response deleteDatasourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
if (!databaseSegmentManager.removeSegment(dataSourceName, segmentId)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).build();
}
@POST
@Path("/{dataSourceName}/segments/{segmentId}")
@Consumes("application/json")
public Response enableDatasourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
if (!databaseSegmentManager.enableSegment(segmentId)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).build();
}
private DruidDataSource getDataSource(final String dataSourceName)
{
Iterable<DruidDataSource> dataSources =
Iterables.concat(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, DruidDataSource>()
{
@Override
public DruidDataSource apply(DruidServer input)
{
return input.getDataSource(dataSourceName);
}
}
)
);
List<DruidDataSource> validDataSources = Lists.newArrayList();
for (DruidDataSource dataSource : dataSources) {
if (dataSource != null) {
validDataSources.add(dataSource);
}
}
if (validDataSources.isEmpty()) {
return null;
}
Map<String, DataSegment> segmentMap = Maps.newHashMap();
for (DruidDataSource dataSource : validDataSources) {
if (dataSource != null) {
Iterable<DataSegment> segments = dataSource.getSegments();
for (DataSegment segment : segments) {
segmentMap.put(segment.getIdentifier(), segment);
}
}
}
return new DruidDataSource(
dataSourceName,
ImmutableMap.<String, String>of()
).addSegments(segmentMap);
}
private Set<DruidDataSource> getDataSources()
{
TreeSet<DruidDataSource> dataSources = Sets.newTreeSet(
new Comparator<DruidDataSource>()
{
@Override
public int compare(DruidDataSource druidDataSource, DruidDataSource druidDataSource1)
{
return druidDataSource.getName().compareTo(druidDataSource1.getName());
}
}
);
dataSources.addAll(
Lists.newArrayList(
Iterables.concat(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, Iterable<DruidDataSource>>()
{
@Override
public Iterable<DruidDataSource> apply(DruidServer input)
{
return input.getDataSources();
}
}
)
)
)
);
return dataSources;
}
}

View File

@ -57,6 +57,7 @@ import java.util.TreeSet;
/**
*/
@Deprecated
@Path("/info")
public class InfoResource
{
@ -102,7 +103,6 @@ public class InfoResource
private final DatabaseRuleManager databaseRuleManager;
private final IndexingServiceClient indexingServiceClient;
@Inject
public InfoResource(
DruidCoordinator coordinator,

View File

@ -0,0 +1,70 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.inject.Inject;
import io.druid.db.DatabaseRuleManager;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
/**
*/
@Path("/druid/coordinator/v1/rules")
public class RulesResource
{
private final DatabaseRuleManager databaseRuleManager;
@Inject
public RulesResource(
DatabaseRuleManager databaseRuleManager
)
{
this.databaseRuleManager = databaseRuleManager;
}
@GET
@Produces("application/json")
public Response getRules()
{
return Response.ok(databaseRuleManager.getAllRules()).build();
}
@GET
@Path("/{dataSourceName}")
@Produces("application/json")
public Response getDatasourceRules(
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("full") final String full
)
{
if (full != null) {
return Response.ok(databaseRuleManager.getRulesWithDefault(dataSourceName))
.build();
}
return Response.ok(databaseRuleManager.getRules(dataSourceName))
.build();
}
}

View File

@ -0,0 +1,188 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Map;
/**
*/
@Path("/druid/coordinator/v1/servers")
public class ServersResource
{
private static Map<String, Object> makeSimpleServer(DruidServer input)
{
return new ImmutableMap.Builder<String, Object>()
.put("host", input.getHost())
.put("tier", input.getTier())
.put("currSize", input.getCurrSize())
.put("maxSize", input.getMaxSize())
.build();
}
private final InventoryView serverInventoryView;
@Inject
public ServersResource(
InventoryView serverInventoryView
)
{
this.serverInventoryView = serverInventoryView;
}
@GET
@Produces("application/json")
public Response getClusterServers(
@QueryParam("full") String full,
@QueryParam("simple") String simple
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (full != null) {
return builder.entity(Lists.newArrayList(serverInventoryView.getInventory())).build();
} else if (simple != null) {
return builder.entity(
Lists.newArrayList(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(DruidServer input)
{
return makeSimpleServer(input);
}
}
)
)
).build();
}
return builder.entity(
Lists.newArrayList(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, String>()
{
@Override
public String apply(DruidServer druidServer)
{
return druidServer.getHost();
}
}
)
)
).build();
}
@GET
@Path("/{serverName}")
@Produces("application/json")
public Response getServer(
@PathParam("serverName") String serverName,
@QueryParam("simple") String simple
)
{
DruidServer server = serverInventoryView.getInventoryValue(serverName);
if (server == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (simple != null) {
return builder.entity(makeSimpleServer(server)).build();
}
return builder.entity(server)
.build();
}
@GET
@Path("/{serverName}/segments")
@Produces("application/json")
public Response getServerSegments(
@PathParam("serverName") String serverName,
@QueryParam("full") String full
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
DruidServer server = serverInventoryView.getInventoryValue(serverName);
if (server == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
if (full != null) {
return builder.entity(server.getSegments().values()).build();
}
return builder.entity(
Collections2.transform(
server.getSegments().values(),
new Function<DataSegment, String>()
{
@Override
public String apply(@Nullable DataSegment segment)
{
return segment.getIdentifier();
}
}
)
).build();
}
@GET
@Path("/{serverName}/segments/{segmentId}")
@Produces("application/json")
public Response getServerSegment(
@PathParam("serverName") String serverName,
@PathParam("segmentId") String segmentId
)
{
DruidServer server = serverInventoryView.getInventoryValue(serverName);
if (server == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
DataSegment segment = server.getSegment(segmentId);
if (segment == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).entity(segment).build();
}
}

View File

@ -0,0 +1,88 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.api.client.util.Maps;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.inject.Inject;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.util.Map;
import java.util.Set;
/**
*/
@Path("/druid/coordinator/v1/tiers")
public class TiersResource
{
private final InventoryView serverInventoryView;
@Inject
public TiersResource(
InventoryView serverInventoryView
)
{
this.serverInventoryView = serverInventoryView;
}
@GET
@Produces("application/json")
public Response getTiers(
@QueryParam("simple") String simple
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (simple != null) {
Map<String, Map<String, Long>> metadata = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
Map<String, Long> tierMetadata = metadata.get(druidServer.getTier());
if (tierMetadata == null) {
tierMetadata = Maps.newHashMap();
metadata.put(druidServer.getTier(), tierMetadata);
}
Long currSize = tierMetadata.get("currSize");
tierMetadata.put("currSize", (currSize == null) ? 0 : currSize + druidServer.getCurrSize());
Long maxSize = tierMetadata.get("maxSize");
tierMetadata.put("maxSize", (maxSize == null) ? 0 : maxSize + druidServer.getMaxSize());
}
return builder.entity(metadata).build();
}
Set<String> tiers = Sets.newHashSet();
for (DruidServer server : serverInventoryView.getInventory()) {
tiers.add(server.getTier());
}
return builder.entity(tiers).build();
}
}

View File

@ -21,7 +21,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'POST',
url:'/info/datasources/' + selected,
url:'/druid/coordinator/v1/datasources/' + selected,
data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8",
dataType:"json",
@ -50,7 +50,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'DELETE',
url:'/info/datasources/' + selected,
url:'/druid/coordinator/v1/datasources/' + selected,
data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8",
dataType:"json",
@ -70,12 +70,12 @@ $(document).ready(function() {
}
});
$.getJSON("/info/db/datasources", function(enabled_datasources) {
$.getJSON("/druid/coordinator/v1/db/datasources", function(enabled_datasources) {
$.each(enabled_datasources, function(index, datasource) {
$('#enabled_datasources').append($('<li>' + datasource + '</li>'));
});
$.getJSON("/info/db/datasources?includeDisabled", function(db_datasources) {
$.getJSON("/druid/coordinator/v1/db/datasources?includeDisabled", function(db_datasources) {
var disabled_datasources = _.difference(db_datasources, enabled_datasources);
$.each(disabled_datasources, function(index, datasource) {
$('#disabled_datasources').append($('<li>' + datasource + '</li>'));

View File

@ -2,7 +2,7 @@
$(document).ready(function() {
var basePath = "/info/";
var basePath = "/druid/coordinator/v1/";
var type = $('#select_type').attr('value') + '';
var view = $('#select_view').attr('value') + '';

View File

@ -100,8 +100,8 @@ $(document).ready(function() {
}
// Execution stuff
$.get('/info/coordinator', function(data) {
$("#coordinator").html('Current Cluster Coordinator: ' + data.host);
$.get('/druid/coordinator/v1/leader', function(data) {
$("#coordinator").html('Current Cluster Coordinator Leader: ' + data.host);
});
$('#move_segment').submit(function() {
@ -118,57 +118,10 @@ $(document).ready(function() {
});
}
/*
$.ajax({
url:"/coordinator/move",
type: "POST",
data: JSON.stringify(data),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
alert(error + ": " + xhr.responseText);
},
success: function(data, status, xhr) {
for (seg in CONSOLE.selected_segments) {
CONSOLE.selected_segments[seg].children('.server_host').text($('#move_segment > .to').val());
}
}
});
*/
return false;
});
/*$
('#drop_segment').submit(function() {
var data = [];
if ($.isEmptyObject(CONSOLE.selected_segments)) {
alert("Please select at least one segment");
}
for (seg in CONSOLE.selected_segments) {
data.push({
'segmentName' : seg,
'from' : CONSOLE.selected_segments[seg]
});
}
$.ajax({
url:"/coordinator/drop",
type: "POST",
data: JSON.stringify(data),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
alert(error + ": " + xhr.responseText);
}
});
return false;
});
*/
$.get('/info/cluster', function(data) {
$.get('/druid/coordinator/v1/servers?full', function(data) {
$('.loading').hide();
initTables(data);
@ -176,26 +129,5 @@ $(document).ready(function() {
var oTable = [];
initDataTable($('#servers'), oTable);
initDataTable($('#segments'), oTable);
// init select segments
/*$("#segments tbody").click(function(event) {
var el = $(event.target.parentNode);
var key = el.children('.segment_name').text();
if (el.is("tr")) {
if (el.hasClass('row_selected')) {
el.removeClass('row_selected');
delete CONSOLE.selected_segments[key];
} else {
el.addClass('row_selected');
CONSOLE.selected_segments[key] = el;
}
var html ="";
for (segment in CONSOLE.selected_segments) {
html += segment + ' on ' + CONSOLE.selected_segments[segment].children('.server_host').text() + '<br/>';
}
$('#selected_segments').html(html);
}
});*/
});
});

View File

@ -22,7 +22,7 @@ $(document).ready(function() {
var interval = $('#interval').val();
$.ajax({
type: 'DELETE',
url:'/info/datasources/' + selected +'?kill=true&interval=' + interval,
url:'/druid/coordinator/v1/datasources/' + selected +'?kill=true&interval=' + interval,
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
@ -41,7 +41,7 @@ $(document).ready(function() {
}
});
$.getJSON("/info/db/datasources?includeDisabled", function(data) {
$.getJSON("/druid/coordinator/v1/db/datasources?includeDisabled", function(data) {
$.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
});

View File

@ -155,7 +155,7 @@ function makeTiersDropdown(selTier) {
function getRules() {
var selected = $('#datasources option:selected').text();
if (selected !== "") {
$.getJSON("/info/rules/" + selected, function(data) {
$.getJSON("/druid/coordinator/v1/rules/" + selected, function(data) {
$('#rules_list').empty();
if (!$.isEmptyObject(data)) {
$.each(data, function(index, rule) {
@ -240,7 +240,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'POST',
url:'/info/rules/' + selected,
url:'/druid/coordinator/v1/rules/' + selected,
data: JSON.stringify(rules),
contentType:"application/json; charset=utf-8",
dataType:"json",
@ -260,11 +260,11 @@ $(document).ready(function() {
}
});
$.getJSON("/info/tiers", function(theTiers) {
$.getJSON("/druid/coordinator/v1/tiers", function(theTiers) {
tiers = theTiers;
});
$.getJSON("/info/db/datasources", function(data) {
$.getJSON("/druid/coordinator/v1/db/datasources", function(data) {
$.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
});

View File

@ -17,36 +17,41 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
package io.druid.client;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import io.druid.client.DirectDruidClient;
import io.druid.client.DruidServer;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
/**
*/
public class ServerSelectorTest
public class DirectDruidClientTest
{
private HttpClient httpClient;
@ -57,10 +62,16 @@ public class ServerSelectorTest
}
@Test
public void testPick() throws Exception
public void testRun() throws Exception
{
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
SettableFuture futureException = SettableFuture.create();
SettableFuture<InputStream> futureResult = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureResult).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureException).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce();
EasyMock.replay(httpClient);
@ -81,13 +92,13 @@ public class ServerSelectorTest
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
new DefaultObjectMapper(),
httpClient,
"foo"
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
new DefaultObjectMapper(new SmileFactory()),
new DefaultObjectMapper(),
httpClient,
"foo2"
);
@ -105,11 +116,28 @@ public class ServerSelectorTest
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
client1.run(query);
client1.run(query);
client1.run(query);
Sequence s1 = client1.run(query);
Assert.assertEquals(1, client1.getNumOpenConnections());
Assert.assertTrue(client1.getNumOpenConnections() == 3);
// simulate read timeout
Sequence s2 = client1.run(query);
Assert.assertEquals(2, client1.getNumOpenConnections());
futureException.setException(new ReadTimeoutException());
Assert.assertEquals(1, client1.getNumOpenConnections());
// subsequent connections should work
Sequence s3 = client1.run(query);
Sequence s4 = client1.run(query);
Sequence s5 = client1.run(query);
Assert.assertTrue(client1.getNumOpenConnections() == 4);
// produce result for first connection
futureResult.set(new ByteArrayInputStream("[{\"timestamp\":\"2014-01-01T01:02:03Z\", \"result\": 42.0}]".getBytes()));
List<Result> results = Sequences.toList(s1, Lists.<Result>newArrayList());
Assert.assertEquals(1, results.size());
Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp());
Assert.assertEquals(3, client1.getNumOpenConnections());
client2.run(query);
client2.run(query);

View File

@ -78,7 +78,7 @@ public class RealtimeManagerTest
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
new FireDepartmentConfig(1, new Period("P1Y")),
new FireDepartmentConfig(1, new Period("P1Y"), 1),
new FirehoseFactory()
{
@Override

View File

@ -86,7 +86,7 @@ public class RealtimePlumberSchoolTest
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
IndexGranularity.HOUR
IndexGranularity.HOUR, 1
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);

View File

@ -37,8 +37,10 @@ public class ServerTimeRejectionPolicyFactoryTest
DateTime now = new DateTime();
DateTime past = now.minus(period).minus(1);
DateTime future = now.plus(period).plus(1);
Assert.assertTrue(rejectionPolicy.accept(now.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(past.getMillis()));
Assert.assertFalse(rejectionPolicy.accept(future.getMillis()));
}
}

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.48-SNAPSHOT</version>
<version>0.6.53-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -54,7 +54,7 @@ import java.util.List;
*/
@Command(
name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.46/Broker.html for a description"
description = "Runs a broker node, see http://druid.io/docs/0.6.52/Broker.html for a description"
)
public class CliBroker extends ServerRunnable
{

View File

@ -47,9 +47,14 @@ import io.druid.server.http.BackwardsCompatibleInfoResource;
import io.druid.server.http.CoordinatorDynamicConfigsResource;
import io.druid.server.http.CoordinatorRedirectInfo;
import io.druid.server.http.CoordinatorResource;
import io.druid.server.http.DBResource;
import io.druid.server.http.DatasourcesResource;
import io.druid.server.http.InfoResource;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.http.RulesResource;
import io.druid.server.http.ServersResource;
import io.druid.server.http.TiersResource;
import io.druid.server.initialization.JettyServerInitializer;
import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
@ -60,7 +65,7 @@ import java.util.List;
*/
@Command(
name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.46/Coordinator.html for a description."
description = "Runs the Coordinator, see http://druid.io/docs/0.6.52/Coordinator.html for a description."
)
public class CliCoordinator extends ServerRunnable
{
@ -107,6 +112,11 @@ public class CliCoordinator extends ServerRunnable
Jerseys.addResource(binder, InfoResource.class);
Jerseys.addResource(binder, CoordinatorResource.class);
Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class);
Jerseys.addResource(binder, TiersResource.class);
Jerseys.addResource(binder, RulesResource.class);
Jerseys.addResource(binder, ServersResource.class);
Jerseys.addResource(binder, DatasourcesResource.class);
Jerseys.addResource(binder, DBResource.class);
LifecycleModule.register(binder, Server.class);
}

View File

@ -41,7 +41,7 @@ import java.util.List;
*/
@Command(
name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.46/Batch-ingestion.html for a description."
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.52/Batch-ingestion.html for a description."
)
public class CliHadoopIndexer implements Runnable
{

View File

@ -42,7 +42,7 @@ import java.util.List;
*/
@Command(
name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.46/Historical.html for a description"
description = "Runs a Historical node, see http://druid.io/docs/0.6.52/Historical.html for a description"
)
public class CliHistorical extends ServerRunnable
{

View File

@ -93,7 +93,7 @@ import java.util.List;
*/
@Command(
name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.46/Indexing-Service.html for a description"
description = "Runs an Overlord node, see http://druid.io/docs/0.6.52/Indexing-Service.html for a description"
)
public class CliOverlord extends ServerRunnable
{

View File

@ -30,7 +30,7 @@ import java.util.List;
*/
@Command(
name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.46/Realtime.html for a description"
description = "Runs a realtime node, see http://druid.io/docs/0.6.52/Realtime.html for a description"
)
public class CliRealtime extends ServerRunnable
{

View File

@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
*/
@Command(
name = "realtime",
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.46/Realtime.html for a description"
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.52/Realtime.html for a description"
)
public class CliRealtimeExample extends ServerRunnable
{

View File

@ -53,7 +53,7 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer
// The coordinator really needs a standarized api path
root.addFilter(GuiceFilter.class, "/status/*", null);
root.addFilter(GuiceFilter.class, "/info/*", null);
root.addFilter(GuiceFilter.class, "/coordinator/*", null);
root.addFilter(GuiceFilter.class, "/druid/coordinator/*", null);
HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{root});