Merge branch 'master' into offheap-incremental-index

Conflicts:
	processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java

processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactor
y.java

processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSt
orageAdapter.java
This commit is contained in:
nishantmonu51 2014-06-12 20:02:27 +05:30
commit 025814cfff
81 changed files with 2049 additions and 690 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -12,18 +12,18 @@ You can provision individual servers, loading Druid onto each machine (or buildi
[Apache Whirr](http://whirr.apache.org/) is a set of libraries for launching cloud services. For Druid, Whirr serves as an easy way to launch a cluster in Amazon AWS by using simple commands and configuration files (called *recipes*).
**NOTE:** Whirr will install Druid 0.5.x. At this time Whirr can launch Druid 0.5.x only, but in the near future will support Druid 0.6.x. You can download and install your own copy of Druid 0.5.x [here](http://static.druid.io/artifacts/releases/druid-services-0.5.7-bin.tar.gz).
**NOTE:** Whirr will install Druid 0.6.121. Also, it doesn't work with JDK1.7.0_55. JDK1.7.0_45 recommended.
You'll need an AWS account, and an EC2 key pair from that account so that Whirr can connect to the cloud via the EC2 API. If you haven't generated a key pair, see the [AWS documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) or see this [Whirr FAQ](http://whirr.apache.org/faq.html#how-do-i-find-my-cloud-credentials).
You'll need an AWS account, S3 Bucket and an EC2 key pair from that account so that Whirr can connect to the cloud via the EC2 API. If you haven't generated a key pair, see the [AWS documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) or see this [Whirr FAQ](http://whirr.apache.org/faq.html#how-do-i-find-my-cloud-credentials).
### Installing Whirr
You must use a version of Whirr that includes and supports a Druid recipe. You can do it so in one of two ways:
#### Build the Following Version of Whirr
Clone the code from [https://github.com/rjurney/whirr/tree/trunk](https://github.com/rjurney/whirr/tree/trunk) and build Whirr:
Clone the code from [https://github.com/druid-io/whirr](https://github.com/druid-io/whirr) and build Whirr:
git clone git@github.com:rjurney/whirr.git
git clone git@github.com:druid-io/whirr.git
cd whirr
git checkout trunk
mvn clean install -Dmaven.test.failure.ignore=true
@ -39,10 +39,12 @@ Then run `mvn install` from the root directory.
The Whirr recipe for Druid is the configuration file `$WHIRR_HOME/recipies/druid.properties`. You can edit this file to suit your needs -- it is annotated and self-explanatory. Here are some hints about that file:
* Set `whirr.location-id` to a specific AWS region (e.g., us-east-1) if desired, else one will be chosen for you.
* You can choose the hardware used with `whirr.hardware-id` to a specific instance type (e.g., m1.large). If you don't choose an image via `whirr.image-id` (image must be compatible with hardware), you'll get plain vanilla Linux.
* You can choose the hardware used with `whirr.hardware-id` to a specific instance type (e.g., m1.large). By default druid.properties, m3.2xlarge (broker, historical, middle manager), m1.xlarge (coordinator, overlord), and m1.small (zookeeper, mysql) are used.
* If you don't choose an image via `whirr.image-id` (image must be compatible with hardware), you'll get plain vanilla Linux. Default druid.properties uses ami-018c9568 (Ubuntu 12.04).
* SSH keys (not password protected) must exist for the local user. If they are in the default locations, `${sys:user.home}/.ssh/id_rsa` and `${sys:user.home}/.ssh/id_rsa.pub`, Whirr will find them. Otherwise, you'll have to specify them with `whirr.private-key-file` and `whirr.public-key-file`.
* Be sure to specify the absolute path of the Druid realtime spec file `realtime.spec` in `whirr.druid.realtime.spec.path`.
* Two Druid cluster templates (see `whirr.instance-templates`) are provided: a small cluster running on a single EC2 instance, and a larger cluster running on multiple instances. The first is a good test case to start with.
* Also make sure to specify the correct S3 bucket. Otherwise the cluster won't be able to process tasks.
* Two Druid cluster templates (see `whirr.instance-templates`) are provided: a small cluster running on a single EC2 instance, and a larger cluster running on multiple instances.
The following AWS information must be set in `druid.properties`, as environment variables, or in the file `$WHIRR_HOME/conf/credentials`:
@ -52,6 +54,8 @@ The following AWS information must be set in `druid.properties`, as environment
How to get the IDENTITY and CREDENTIAL keys is discussed above.
In order to configure each node, you can edit `services/druid/src/main/resources/functions/start_druid.sh` for JVM configuration and `services/druid/src/main/resources/functions/configure_[NODE_NAME].sh` for specific node configuration. For more information on configuration, read the Druid documentations about it (http://druid.io/docs/0.6.116/Configuration.html).
### Start a Test Cluster With Whirr
Run the following command:
@ -77,4 +81,16 @@ Note that Whirr will return an exception if any of the nodes fail to launch, and
% $WHIRR_HOME/bin/whirr destroy-cluster --config $WHIRR_HOME/recipes/druid.properties
```
### Testing the Cluster
Now you can run an indexing task and a simple query to see if all the nodes have launched correctly. We are going to use a Wikipedia example again. For a realtime indexing task, run the following command:
```bash
curl -X 'POST' -H 'Content-Type:application/json' -d @#{YOUR_DRUID_DIRECTORY}/examples/indexing/wikipedia_realtime_task.json #{OVERLORD_PUBLIC_IP_ADDR}:#{PORT}/druid/indexer/v1/task
```
Issuing the request should return a task ID.
To check the state of the overlord, open up your browser and go to `#{OVERLORD_PUBLIC_IP_ADDR}:#{PORT}/console.html`.
Next, go to `#{COORDINATOR_PUBLIC_IP_ADDR}:#{PORT}`. Click "View Information about the Cluster"->"Full Cluster View." You should now see the information about servers and segments. If the cluster runs correctly, Segment dimensions and Segment binaryVersion fields should be filled up. Allow few minutes for the segments to be processed.
Now you should be able to query the data using broker's public IP address.

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.119
git checkout druid-0.6.121
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -55,7 +55,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/overlord
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -137,7 +137,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/middlemanager
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119","io.druid.extensions:druid-kafka-seven:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -285,7 +285,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/historical
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119","io.druid.extensions:druid-kafka-seven:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -28,7 +28,7 @@ Configuration:
-Ddruid.zk.service.host=localhost
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.119"]
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.119
cd druid-services-0.6.121
```
You should see a bunch of files:

View File

@ -96,7 +96,7 @@ The configurations for the overlord node are as follows:
-Ddruid.zk.service.host=localhost
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.119"]
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.119","io.druid.extensions:druid-kafka-seven:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.119
cd druid-services-0.6.121
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.119-bin.tar.gz).
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz).
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -9,7 +9,7 @@
-Ddruid.zk.service.host=localhost
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.119"]
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.119","io.druid.extensions:druid-kafka-seven:0.6.119","io.druid.extensions:druid-rabbitmq:0.6.119"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121","io.druid.extensions:druid-rabbitmq:0.6.121"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -0,0 +1,81 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.hdfs;
import com.google.inject.Inject;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class HdfsDataSegmentKiller implements DataSegmentKiller
{
private final Configuration config;
@Inject
public HdfsDataSegmentKiller(final Configuration config)
{
this.config = config;
}
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
final Path path = getPath(segment);
final FileSystem fs = checkPathAndGetFilesystem(path);
try {
if (path.getName().endsWith(".zip")) {
// delete the parent directory containing the zip file and the descriptor
fs.delete(path.getParent(), true);
} else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to kill segment");
}
}
private Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get("path")));
}
private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException
{
FileSystem fs;
try {
fs = path.getFileSystem(config);
if (!fs.exists(path)) {
throw new SegmentLoadingException("Path[%s] doesn't exist.", path);
}
return fs;
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", path);
}
}
}

View File

@ -55,6 +55,7 @@ public class HdfsStorageDruidModule implements DruidModule
{
Binders.dataSegmentPullerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentKiller.class).in(LazySingleton.class);
final Configuration conf = new Configuration();
if (props != null) {

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -35,7 +34,6 @@ import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.TimestampParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
@ -46,15 +44,14 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.indexing.IOConfig;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
@ -480,7 +477,7 @@ public class IndexTask extends AbstractFixedIntervalTask
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0) : tuningConfig;
}
@Override
@ -528,6 +525,9 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonTypeName("index")
public static class IndexTuningConfig implements TuningConfig
{
private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 500000;
private final int targetPartitionSize;
private final int rowFlushBoundary;
@ -537,8 +537,8 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
this.targetPartitionSize = targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary;
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
}
@JsonProperty
@ -553,12 +553,4 @@ public class IndexTask extends AbstractFixedIntervalTask
return rowFlushBoundary;
}
}
public static void main(String[] args)
{
Function<String, DateTime> parser = TimestampParser.createTimestampParser("millis");
parser.apply("1401266370985");
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

27
pom.xml
View File

@ -18,12 +18,13 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
@ -78,7 +79,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.9.5</version>
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -203,17 +204,17 @@
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>4.0-beta</version>
<version>4.0-beta4</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>4.0-beta</version>
<version>4.0-beta4</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<version>4.0-beta</version>
<version>4.0-beta4</version>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
@ -280,6 +281,11 @@
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.jamesmurty.utils</groupId>
<artifactId>java-xmlbuilder</artifactId>
@ -393,7 +399,14 @@
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.9.0</version>
<version>0.9.1</version>
<exclusions>
<exclusion>
<!-- exclude artifact not available in maven central -->
<groupId>com.centerkey.utils</groupId>
<artifactId>barebonesbrowserlaunch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -32,7 +32,12 @@ public class DefaultObjectMapper extends ObjectMapper
{
public DefaultObjectMapper()
{
this(null);
this((JsonFactory)null);
}
public DefaultObjectMapper(DefaultObjectMapper mapper)
{
super(mapper);
}
public DefaultObjectMapper(JsonFactory factory)
@ -52,4 +57,10 @@ public class DefaultObjectMapper extends ObjectMapper
configure(MapperFeature.AUTO_DETECT_SETTERS, false);
configure(SerializationFeature.INDENT_OUTPUT, false);
}
@Override
public ObjectMapper copy()
{
return new DefaultObjectMapper(this);
}
}

View File

@ -32,6 +32,7 @@ import com.google.common.base.Throwables;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import org.joda.time.DateTimeZone;
import java.io.IOException;
@ -104,7 +105,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
jgen.writeStartArray();
value.accumulate(
null,
new Accumulator()
new Accumulator<Object, Object>()
{
@Override
public Object accumulate(Object o, Object o1)
@ -115,7 +116,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
catch (IOException e) {
throw Throwables.propagate(e);
}
return o;
return null;
}
}
);
@ -123,6 +124,28 @@ public class DruidDefaultSerializersModule extends SimpleModule
}
}
);
addSerializer(
Yielder.class,
new JsonSerializer<Yielder>()
{
@Override
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
{
jgen.writeStartArray();
try {
while (!yielder.isDone()) {
final Object o = yielder.get();
jgen.writeObject(o);
yielder = yielder.next(null);
}
} finally {
yielder.close();
}
jgen.writeEndArray();
}
}
);
addSerializer(ByteOrder.class, ToStringSerializer.instance);
addDeserializer(
ByteOrder.class,

View File

@ -25,6 +25,10 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
@ -35,11 +39,11 @@ import com.metamx.common.logger.Logger;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
@ -59,27 +63,33 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);
private final Iterable<QueryRunner<T>> queryables;
private final ExecutorService exec;
private final ListeningExecutorService exec;
private final Ordering<T> ordering;
private final QueryWatcher queryWatcher;
public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
QueryRunner<T>... queryables
)
{
this(exec, ordering, Arrays.asList(queryables));
this(exec, ordering, queryWatcher, Arrays.asList(queryables));
}
public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
Iterable<QueryRunner<T>> queryables
)
{
this.exec = exec;
// listeningDecorator will leave PrioritizedExecutorService unchanged,
// since it already implements ListeningExecutorService
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.queryWatcher = queryWatcher;
}
@Override
@ -94,71 +104,81 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterator<T> make()
{
// Make it a List<> to materialize all of the values (so that it will submit everything to the executor)
List<Future<List<T>>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, Future<List<T>>>()
{
@Override
public Future<List<T>> apply(final QueryRunner<T> input)
{
return exec.submit(
new AbstractPrioritizedCallable<List<T>>(priority)
{
@Override
public List<T> call() throws Exception
{
try {
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
ListenableFuture<List<Iterable<T>>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, ListenableFuture<Iterable<T>>>()
{
@Override
public ListenableFuture<Iterable<T>> apply(final QueryRunner<T> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Iterable<T>>(priority)
{
@Override
public Iterable<T> call() throws Exception
{
try {
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
Sequence<T> result = input.run(query);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
Sequence<T> result = input.run(query);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
List<T> retVal = Sequences.toList(result, Lists.<T>newArrayList());
if (retVal == null) {
throw new ISE("Got a null list of results! WTF?!");
}
List<T> retVal = Sequences.toList(result, Lists.<T>newArrayList());
if (retVal == null) {
throw new ISE("Got a null list of results! WTF?!");
}
return retVal;
return retVal;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
);
}
}
)
)
);
return new MergeIterable<T>(
ordering.nullsFirst(),
Iterables.transform(
futures,
new Function<Future<List<T>>, Iterable<T>>()
{
@Override
public Iterable<T> apply(Future<List<T>> input)
{
try {
return input.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
)
).iterator();
queryWatcher.registerQuery(query, futures);
try {
final Number timeout = query.getContextValue("timeout", (Number)null);
return new MergeIterable<>(
ordering.nullsFirst(),
timeout == null ?
futures.get() :
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS)
).iterator();
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
}
}
@Override

View File

@ -29,6 +29,8 @@ import io.druid.query.filter.NoopDimFilter;
import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.OrDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.metadata.metadata.ColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.search.SearchResultValue;
import io.druid.query.search.search.InsensitiveContainsSearchQuerySpec;
import io.druid.query.search.search.SearchQuery;
@ -823,4 +825,112 @@ public class Druids
{
return new ResultBuilder<TimeBoundaryResultValue>();
}
/**
* A Builder for SegmentMetadataQuery.
* <p/>
* Required: dataSource(), intervals() must be called before build()
* <p/>
* Usage example:
* <pre><code>
* SegmentMetadataQuery query = new SegmentMetadataQueryBuilder()
* .dataSource("Example")
* .interval("2010/2013")
* .build();
* </code></pre>
*
* @see io.druid.query.metadata.metadata.SegmentMetadataQuery
*/
public static class SegmentMetadataQueryBuilder
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private ColumnIncluderator toInclude;
private Boolean merge;
private Map<String, Object> context;
public SegmentMetadataQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
toInclude = null;
merge = null;
context = null;
}
public SegmentMetadataQuery build()
{
return new SegmentMetadataQuery(
dataSource,
querySegmentSpec,
toInclude,
merge,
context
);
}
public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder)
{
return new SegmentMetadataQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.toInclude(toInclude)
.merge(merge)
.context(builder.context);
}
public SegmentMetadataQueryBuilder dataSource(String ds)
{
dataSource = new TableDataSource(ds);
return this;
}
public SegmentMetadataQueryBuilder dataSource(DataSource ds)
{
dataSource = ds;
return this;
}
public SegmentMetadataQueryBuilder intervals(QuerySegmentSpec q)
{
querySegmentSpec = q;
return this;
}
public SegmentMetadataQueryBuilder intervals(String s)
{
querySegmentSpec = new LegacySegmentSpec(s);
return this;
}
public SegmentMetadataQueryBuilder intervals(List<Interval> l)
{
querySegmentSpec = new LegacySegmentSpec(l);
return this;
}
public SegmentMetadataQueryBuilder toInclude(ColumnIncluderator toInclude)
{
this.toInclude = toInclude;
return this;
}
public SegmentMetadataQueryBuilder merge(boolean merge)
{
this.merge = merge;
return this;
}
public SegmentMetadataQueryBuilder context(Map<String, Object> c)
{
context = c;
return this;
}
}
public static SegmentMetadataQueryBuilder newSegmentMetadataQueryBuilder()
{
return new SegmentMetadataQueryBuilder();
}
}

View File

@ -26,6 +26,10 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ResourceClosingSequence;
@ -42,18 +46,21 @@ import io.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class GroupByParallelQueryRunner implements QueryRunner<Row>
{
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
private final Iterable<QueryRunner<Row>> queryables;
private final ExecutorService exec;
private final ListeningExecutorService exec;
private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier;
private final QueryWatcher queryWatcher;
private final StupidPool<ByteBuffer> bufferPool;
@ -61,23 +68,25 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
ExecutorService exec,
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
StupidPool<ByteBuffer> bufferPool,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, bufferPool, Arrays.asList(queryables));
this(exec, ordering, configSupplier, queryWatcher, bufferPool, Arrays.asList(queryables));
}
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
StupidPool<ByteBuffer> bufferPool,
Iterable<QueryRunner<Row>> queryables
)
{
this.exec = exec;
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
@ -98,48 +107,67 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
List<Future<Boolean>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, Future<Boolean>>()
{
@Override
public Future<Boolean> apply(final QueryRunner<Row> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>()
{
@Override
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
);
}
}
)
)
);
// Let the runners complete
for (Future<Boolean> future : futures) {
try {
future.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
try {
queryWatcher.registerQuery(query, futures);
final Number timeout = query.getContextValue("timeout", (Number) null);
if (timeout == null) {
futures.get();
} else {
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
}
return new ResourceClosingSequence<Row>(
Sequences.simple(

View File

@ -0,0 +1,195 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class GroupByParallelQueryRunner implements QueryRunner<Row>
{
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
private final Iterable<QueryRunner<Row>> queryables;
private final ListeningExecutorService exec;
private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier;
<<<<<<< HEAD
private final StupidPool<ByteBuffer> bufferPool;
=======
private final QueryWatcher queryWatcher;
>>>>>>> master
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
<<<<<<< HEAD
StupidPool<ByteBuffer> bufferPool,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, bufferPool, Arrays.asList(queryables));
=======
QueryWatcher queryWatcher,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, queryWatcher, Arrays.asList(queryables));
>>>>>>> master
}
public GroupByParallelQueryRunner(
ExecutorService exec,
<<<<<<< HEAD
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
StupidPool<ByteBuffer> bufferPool,
=======
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
>>>>>>> master
Iterable<QueryRunner<Row>> queryables
)
{
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
}
@Override
public Sequence<Row> run(final Query<Row> queryParam)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
configSupplier.get(),
bufferPool
);
final int priority = query.getContextPriority(0);
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>()
{
@Override
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
)
)
);
// Let the runners complete
try {
queryWatcher.registerQuery(query, futures);
final Number timeout = query.getContextValue("timeout", (Number) null);
if(timeout == null) {
futures.get();
} else {
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
}
return new ResourceClosingSequence<Row>(
Sequences.simple(
indexAccumulatorPair.lhs
.iterableWithPostAggregations(null)
), indexAccumulatorPair.lhs
);
}
}

View File

@ -167,18 +167,20 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
@Override
public void close() throws IOException
{
if (!isDone() && builder.getUser10() == null) {
builder.setUser10("short");
try {
if (!isDone() && builder.getUser10() == null) {
builder.setUser10("short");
}
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken));
if (creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
} finally {
yielder.close();
}
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken));
if(creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
yielder.close();
}
};
}

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class QueryInterruptedException extends RuntimeException
{
public QueryInterruptedException() {
super();
}
@JsonCreator
public QueryInterruptedException(@JsonProperty("error") String message)
{
super(message);
}
public QueryInterruptedException(Throwable cause)
{
super(cause);
}
@JsonProperty("error")
@Override
public String getMessage()
{
return super.getMessage();
}
}

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.util.concurrent.ListenableFuture;
/**
* This interface is in a very early stage and should not be considered stable.
*
* The purpose of the QueryWatcher is to give overall visibility into queries running
* or pending at the QueryRunner level. This is currently used to cancel all the
* parts of a pending query, but may be expanded in the future to offer more direct
* visibility into query execution and resource usage.
*
* QueryRunners executing any computation asynchronously must register their queries
* with the QueryWatcher.
*
*/
public interface QueryWatcher
{
/**
* QueryRunners must use this method to register any pending queries.
*
* The given future may have cancel(true) called at any time, if cancellation of this query has been requested.
*
* @param query a query, which may be a subset of a larger query, as long as the underlying queryId is unchanged
* @param future the future holding the execution status of the query
*/
public void registerQuery(Query query, ListenableFuture future);
}

View File

@ -24,48 +24,60 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Row;
import io.druid.guice.annotations.Global;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.GroupByParallelQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*/
public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupByQuery>
{
private final GroupByQueryEngine engine;
private final QueryWatcher queryWatcher;
private final Supplier<GroupByQueryConfig> config;
private final GroupByQueryQueryToolChest toolChest;
@Global
StupidPool<ByteBuffer> computationBufferPool;
private final StupidPool<ByteBuffer> computationBufferPool;
private static final Logger log = new Logger(GroupByQueryRunnerFactory.class);
@Inject
public GroupByQueryRunnerFactory(
GroupByQueryEngine engine,
QueryWatcher queryWatcher,
Supplier<GroupByQueryConfig> config,
GroupByQueryQueryToolChest toolChest,
@Global StupidPool<ByteBuffer> computationBufferPool
)
{
this.engine = engine;
this.queryWatcher = queryWatcher;
this.config = config;
this.toolChest = toolChest;
this.computationBufferPool = computationBufferPool;
@ -78,8 +90,10 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
@Override
public QueryRunner<Row> mergeRunners(final ExecutorService queryExecutor, Iterable<QueryRunner<Row>> queryRunners)
public QueryRunner<Row> mergeRunners(final ExecutorService exec, Iterable<QueryRunner<Row>> queryRunners)
{
// mergeRunners should take ListeningExecutorService at some point
final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
if (config.get().isSingleThreaded()) {
return new ConcatQueryRunner<Row>(
Sequences.map(
@ -95,7 +109,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
public Sequence<Row> run(final Query<Row> query)
{
Future<Sequence<Row>> future = queryExecutor.submit(
ListenableFuture<Sequence<Row>> future = queryExecutor.submit(
new Callable<Sequence<Row>>()
{
@Override
@ -109,13 +123,25 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
);
try {
return future.get();
queryWatcher.registerQuery(query, future);
final Number timeout = query.getContextValue("timeout", (Number)null);
return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
throw Throwables.propagate(e.getCause());
}
}
};
@ -128,9 +154,11 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
queryExecutor,
new RowOrdering(),
config,
queryWatcher,
computationBufferPool,
queryRunners
);
}
}

View File

@ -22,14 +22,21 @@ package io.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
@ -39,16 +46,27 @@ import io.druid.segment.Segment;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
{
private static final SegmentAnalyzer analyzer = new SegmentAnalyzer();
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest();
private static final Logger log = new Logger(SegmentMetadataQueryRunnerFactory.class);
private final QueryWatcher queryWatcher;
@Inject
public SegmentMetadataQueryRunnerFactory(
QueryWatcher queryWatcher
)
{
this.queryWatcher = queryWatcher;
}
@Override
public QueryRunner<SegmentAnalysis> createRunner(final Segment segment)
@ -100,9 +118,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override
public QueryRunner<SegmentAnalysis> mergeRunners(
final ExecutorService queryExecutor, Iterable<QueryRunner<SegmentAnalysis>> queryRunners
ExecutorService exec, Iterable<QueryRunner<SegmentAnalysis>> queryRunners
)
{
final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
return new ConcatQueryRunner<SegmentAnalysis>(
Sequences.map(
Sequences.simple(queryRunners),
@ -116,28 +135,37 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
{
Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new Callable<Sequence<SegmentAnalysis>>()
final int priority = query.getContextPriority(0);
ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
{
@Override
public Sequence<SegmentAnalysis> call() throws Exception
{
return new ExecutorExecutingSequence<SegmentAnalysis>(
input.run(query),
queryExecutor
);
return input.run(query);
}
}
);
try {
return future.get();
queryWatcher.registerQuery(query, future);
final Number timeout = query.getContextValue("timeout", (Number) null);
return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
throw Throwables.propagate(e.getCause());
}
}
};

View File

@ -24,6 +24,7 @@ import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.search.search.SearchQuery;
import io.druid.segment.Segment;
@ -35,13 +36,16 @@ import java.util.concurrent.ExecutorService;
public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery>
{
private final SearchQueryQueryToolChest toolChest;
private final QueryWatcher queryWatcher;
@Inject
public SearchQueryRunnerFactory(
SearchQueryQueryToolChest toolChest
SearchQueryQueryToolChest toolChest,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.queryWatcher = queryWatcher;
}
@Override
@ -56,7 +60,7 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<Searc
)
{
return new ChainedExecutionQueryRunner<Result<SearchResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -20,6 +20,7 @@
package io.druid.query.select;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
@ -29,6 +30,7 @@ import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
@ -39,25 +41,20 @@ import java.util.concurrent.ExecutorService;
public class SelectQueryRunnerFactory
implements QueryRunnerFactory<Result<SelectResultValue>, SelectQuery>
{
public static SelectQueryRunnerFactory create(ObjectMapper jsonMapper)
{
return new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper),
new SelectQueryEngine()
);
}
private final SelectQueryQueryToolChest toolChest;
private final SelectQueryEngine engine;
private final QueryWatcher queryWatcher;
@Inject
public SelectQueryRunnerFactory(
SelectQueryQueryToolChest toolChest,
SelectQueryEngine engine
SelectQueryEngine engine,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.engine = engine;
this.queryWatcher = queryWatcher;
}
@Override
@ -72,7 +69,7 @@ public class SelectQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<SelectResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -19,6 +19,7 @@
package io.druid.query.timeboundary;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
@ -27,6 +28,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
@ -40,6 +42,13 @@ public class TimeBoundaryQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
{
private static final TimeBoundaryQueryQueryToolChest toolChest = new TimeBoundaryQueryQueryToolChest();
private final QueryWatcher queryWatcher;
@Inject
public TimeBoundaryQueryRunnerFactory(QueryWatcher queryWatcher)
{
this.queryWatcher = queryWatcher;
}
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> createRunner(final Segment segment)
@ -53,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<TimeBoundaryResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -60,27 +60,29 @@ public class TimeseriesQueryEngine
{
Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs);
while (!cursor.isDone()) {
for (Aggregator aggregator : aggregators) {
aggregator.aggregate();
try {
while (!cursor.isDone()) {
for (Aggregator aggregator : aggregators) {
aggregator.aggregate();
}
cursor.advance();
}
cursor.advance();
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime());
for (Aggregator aggregator : aggregators) {
bob.addMetric(aggregator);
}
Result<TimeseriesResultValue> retVal = bob.build();
return retVal;
}
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime());
for (Aggregator aggregator : aggregators) {
bob.addMetric(aggregator);
finally {
// cleanup
for (Aggregator agg : aggregators) {
agg.close();
}
}
Result<TimeseriesResultValue> retVal = bob.build();
// cleanup
for (Aggregator agg : aggregators) {
agg.close();
}
return retVal;
}
}
);

View File

@ -19,6 +19,7 @@
package io.druid.query.timeseries;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
@ -28,6 +29,7 @@ import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
@ -39,25 +41,20 @@ import java.util.concurrent.ExecutorService;
public class TimeseriesQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeseriesResultValue>, TimeseriesQuery>
{
public static TimeseriesQueryRunnerFactory create()
{
return new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine()
);
}
private final TimeseriesQueryQueryToolChest toolChest;
private final TimeseriesQueryEngine engine;
private final QueryWatcher queryWatcher;
@Inject
public TimeseriesQueryRunnerFactory(
TimeseriesQueryQueryToolChest toolChest,
TimeseriesQueryEngine engine
TimeseriesQueryEngine engine,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.engine = engine;
this.queryWatcher = queryWatcher;
}
@Override
@ -72,7 +69,7 @@ public class TimeseriesQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<TimeseriesResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -30,6 +30,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
@ -43,15 +44,18 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
{
private final StupidPool<ByteBuffer> computationBufferPool;
private final TopNQueryQueryToolChest toolchest;
private final QueryWatcher queryWatcher;
@Inject
public TopNQueryRunnerFactory(
@Global StupidPool<ByteBuffer> computationBufferPool,
TopNQueryQueryToolChest toolchest
TopNQueryQueryToolChest toolchest,
QueryWatcher queryWatcher
)
{
this.computationBufferPool = computationBufferPool;
this.toolchest = toolchest;
this.queryWatcher = queryWatcher;
}
@Override
@ -79,7 +83,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
)
{
return new ChainedExecutionQueryRunner<Result<TopNResultValue>>(
queryExecutor, toolchest.getOrdering(), queryRunners
queryExecutor, toolchest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -27,6 +27,7 @@ import com.google.common.io.Closeables;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.QueryInterruptedException;
import io.druid.query.filter.Filter;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
@ -224,6 +225,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
cursorOffset.increment();
}
@ -652,6 +656,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
++currRow;
}

View File

@ -48,4 +48,4 @@ public class ReferenceCountingSequence<T> extends YieldingSequenceBase<T>
final Closeable closeable = segment.increment();
return new ResourceClosingYielder<OutType>(baseSequence.toYielder(initValue, accumulator), closeable);
}
}
}

View File

@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.QueryInterruptedException;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory;
@ -199,6 +200,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
while (baseIter.hasNext()) {
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
currEntry.set(baseIter.next());
if (filterMatcher.matches()) {
@ -238,6 +243,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
Iterators.advance(baseIter, numAdvanced);
}
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
boolean foundMatched = false;
while (baseIter.hasNext()) {
currEntry.set(baseIter.next());

View File

@ -0,0 +1,286 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ChainedExecutionQueryRunnerTest
{
@Test
public void testQueryCancellation() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int getNumThreads()
{
return 2;
}
}
);
final CountDownLatch queriesStarted = new CountDownLatch(2);
final CountDownLatch queryIsRegistered = new CountDownLatch(1);
Capture<ListenableFuture> capturedFuture = new Capture<>();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(EasyMock.<Query>anyObject(), EasyMock.and(EasyMock.<ListenableFuture>anyObject(), EasyMock.capture(capturedFuture)));
EasyMock.expectLastCall()
.andAnswer(
new IAnswer<Void>()
{
@Override
public Void answer() throws Throwable
{
queryIsRegistered.countDown();
return null;
}
}
)
.once();
EasyMock.replay(watcher);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
watcher,
Lists.<QueryRunner<Integer>>newArrayList(
runner1,
runner2,
runner3
)
);
final Sequence seq = chainedRunner.run(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.build()
);
Future resultFuture = Executors.newFixedThreadPool(1).submit(
new Runnable()
{
@Override
public void run()
{
Sequences.toList(seq, Lists.newArrayList());
}
}
);
// wait for query to register and start
Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS));
Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS));
// cancel the query
Assert.assertTrue(capturedFuture.hasCaptured());
ListenableFuture future = capturedFuture.getValue();
future.cancel(true);
QueryInterruptedException cause = null;
try {
resultFuture.get();
} catch(ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof QueryInterruptedException);
cause = (QueryInterruptedException)e.getCause();
}
Assert.assertNotNull(cause);
Assert.assertTrue(future.isCancelled());
Assert.assertTrue(runner1.hasStarted);
Assert.assertTrue(runner2.hasStarted);
Assert.assertFalse(runner3.hasStarted);
Assert.assertFalse(runner1.hasCompleted);
Assert.assertFalse(runner2.hasCompleted);
Assert.assertFalse(runner3.hasCompleted);
EasyMock.verify(watcher);
}
@Test
public void testQueryTimeout() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int getNumThreads()
{
return 2;
}
}
);
final CountDownLatch queriesStarted = new CountDownLatch(2);
final CountDownLatch queryIsRegistered = new CountDownLatch(1);
Capture<ListenableFuture> capturedFuture = new Capture<>();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(EasyMock.<Query>anyObject(), EasyMock.and(EasyMock.<ListenableFuture>anyObject(), EasyMock.capture(capturedFuture)));
EasyMock.expectLastCall()
.andAnswer(
new IAnswer<Void>()
{
@Override
public Void answer() throws Throwable
{
queryIsRegistered.countDown();
return null;
}
}
)
.once();
EasyMock.replay(watcher);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
watcher,
Lists.<QueryRunner<Integer>>newArrayList(
runner1,
runner2,
runner3
)
);
final Sequence seq = chainedRunner.run(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.context(ImmutableMap.<String, Object>of("timeout", (100), "queryId", "test"))
.build()
);
Future resultFuture = Executors.newFixedThreadPool(1).submit(
new Runnable()
{
@Override
public void run()
{
Sequences.toList(seq, Lists.newArrayList());
}
}
);
// wait for query to register and start
Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS));
Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS));
// cancel the query
Assert.assertTrue(capturedFuture.hasCaptured());
ListenableFuture future = capturedFuture.getValue();
QueryInterruptedException cause = null;
try {
resultFuture.get();
} catch(ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof QueryInterruptedException);
Assert.assertEquals("Query timeout", e.getCause().getMessage());
cause = (QueryInterruptedException)e.getCause();
}
Assert.assertNotNull(cause);
Assert.assertTrue(future.isCancelled());
Assert.assertTrue(runner1.hasStarted);
Assert.assertTrue(runner2.hasStarted);
Assert.assertFalse(runner3.hasStarted);
Assert.assertFalse(runner1.hasCompleted);
Assert.assertFalse(runner2.hasCompleted);
Assert.assertFalse(runner3.hasCompleted);
EasyMock.verify(watcher);
}
private static class DyingQueryRunner implements QueryRunner<Integer>
{
private final CountDownLatch latch;
private boolean hasStarted = false;
private boolean hasCompleted = false;
public DyingQueryRunner(CountDownLatch latch)
{
this.latch = latch;
}
@Override
public Sequence<Integer> run(Query<Integer> query)
{
hasStarted = true;
latch.countDown();
if (Thread.interrupted()) {
throw new QueryInterruptedException("I got killed");
}
// do a lot of work
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
throw new QueryInterruptedException("I got killed");
}
hasCompleted = true;
return Sequences.simple(Lists.newArrayList(123));
}
}
}

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@ -53,6 +54,16 @@ import java.util.List;
*/
public class QueryRunnerTestHelper
{
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
};
public static final String segmentId = "testSegment";
public static final String dataSource = "testing";
public static final UnionDataSource unionDataSource = new UnionDataSource(

View File

@ -1,11 +1,14 @@
package io.druid.query;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.collections.StupidPool;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.SearchQueryRunnerFactory;
import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
@ -39,7 +42,11 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig));
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool,
new TopNQueryQueryToolChest(topNConfig),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
@ -50,7 +57,12 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
@ -61,7 +73,7 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()));
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), QueryRunnerTestHelper.NOOP_QUERYWATCHER);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
@ -72,11 +84,10 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory();
QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
}
}

View File

@ -114,6 +114,7 @@ public class GroupByQueryRunnerTest
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool),
TestQueryRunners.pool

View File

@ -73,6 +73,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine, TestQueryRunners.pool),
TestQueryRunners.pool

View File

@ -25,6 +25,7 @@ import io.druid.query.LegacyDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
@ -95,7 +96,7 @@ public class SegmentAnalyzerTest
private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
{
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(), index
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER), index
);
final SegmentMetadataQuery query = new SegmentMetadataQuery(

View File

@ -21,17 +21,76 @@ package io.druid.query.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ListColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.TestIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class SegmentMetadataQueryTest
{
@SuppressWarnings("unchecked")
private final QueryRunner runner = makeQueryRunner(
new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER)
);
private ObjectMapper mapper = new DefaultObjectMapper();
@SuppressWarnings("unchecked")
public static QueryRunner makeQueryRunner(
QueryRunnerFactory factory
)
{
return QueryRunnerTestHelper.makeQueryRunner(
factory,
new QueryableIndexSegment(QueryRunnerTestHelper.segmentId, TestIndex.getMMappedTestIndex())
);
}
@Test
@SuppressWarnings("unchecked")
public void testSegmentMetadataQuery()
{
SegmentMetadataQuery query = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
.merge(true)
.build();
Iterable<SegmentAnalysis> results = Sequences.toList(
runner.run(query),
Lists.<SegmentAnalysis>newArrayList()
);
SegmentAnalysis val = results.iterator().next();
Assert.assertEquals("testSegment", val.getId());
Assert.assertEquals(69843, val.getSize());
Assert.assertEquals(
Arrays.asList(new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")),
val.getIntervals()
);
Assert.assertEquals(1, val.getColumns().size());
final ColumnAnalysis columnAnalysis = val.getColumns().get("placement");
Assert.assertEquals("STRING", columnAnalysis.getType());
Assert.assertEquals(10881, columnAnalysis.getSize());
Assert.assertEquals(new Integer(1), columnAnalysis.getCardinality());
Assert.assertNull(columnAnalysis.getErrorMessage());
}
@Test
public void testSerde() throws Exception
{

View File

@ -23,10 +23,13 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
import io.druid.query.search.search.FragmentSearchQuerySpec;
@ -56,7 +59,10 @@ public class SearchQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()))
new SearchQueryRunnerFactory(
new SearchQueryQueryToolChest(new SearchQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
}

View File

@ -22,11 +22,15 @@ package io.druid.query.select;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.filter.SelectorDimFilter;
@ -54,7 +58,11 @@ public class SelectQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
SelectQueryRunnerFactory.create(new DefaultObjectMapper())
new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new QueryConfig(), new DefaultObjectMapper()),
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
}

View File

@ -20,10 +20,13 @@
package io.druid.query.timeboundary;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import org.joda.time.DateTime;
import org.junit.Assert;
@ -43,7 +46,7 @@ public class TimeBoundaryQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new TimeBoundaryQueryRunnerFactory()
new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER)
);
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
@ -46,7 +47,11 @@ public class TimeSeriesUnionQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeUnionQueryRunners(
TimeseriesQueryRunnerFactory.create()
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
}

View File

@ -28,8 +28,10 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
@ -89,7 +91,12 @@ public class TimeseriesQueryRunnerBonusTest
private static List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index)
{
final QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
final QueryRunner<Result<TimeseriesResultValue>> runner = makeQueryRunner(
factory,
new IncrementalIndexSegment(index, null)

View File

@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequences;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
@ -62,7 +63,11 @@ public class TimeseriesQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
TimeseriesQueryRunnerFactory.create()
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
}

View File

@ -68,7 +68,8 @@ public class TopNQueryRunnerTest
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);
@ -85,7 +86,8 @@ public class TopNQueryRunnerTest
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);

View File

@ -65,7 +65,8 @@ public class TopNUnionQueryTest
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);
@ -82,7 +83,8 @@ public class TopNUnionQueryTest
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);

View File

@ -29,7 +29,9 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
@ -37,6 +39,8 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SpatialDimFilter;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
@ -439,7 +443,12 @@ public class SpatialFilterBonusTest
)
);
try {
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
@ -521,7 +530,12 @@ public class SpatialFilterBonusTest
)
);
try {
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
@ -533,4 +547,4 @@ public class SpatialFilterBonusTest
throw Throwables.propagate(e);
}
}
}
}

View File

@ -29,7 +29,9 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
@ -37,6 +39,8 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SpatialDimFilter;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
@ -469,7 +473,12 @@ public class SpatialFilterTest
)
);
try {
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
@ -551,7 +560,12 @@ public class SpatialFilterTest
)
);
try {
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
@ -563,4 +577,4 @@ public class SpatialFilterTest
throw Throwables.propagate(e);
}
}
}
}

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -120,6 +120,10 @@ public class S3DataSegmentMover implements DataSegmentMover
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
} else if (s3Client.getObjectDetails(s3Bucket, s3Path)
.getStorageClass()
.equals(S3Object.STORAGE_CLASS_GLACIER)) {
log.warn("Cannot move file[s3://%s/%s] of storage class glacier.");
} else {
log.info(
"Moving file[s3://%s/%s] to [s3://%s/%s]",

View File

@ -73,11 +73,6 @@ public class S3StorageDruidModule implements DruidModule
@LazySingleton
public RestS3Service getRestS3Service(AWSCredentials credentials)
{
try {
return new RestS3Service(credentials);
}
catch (S3ServiceException e) {
throw new ProvisionException("Unable to create a RestS3Service", e);
}
}
}

View File

@ -30,6 +30,7 @@ import io.druid.timeline.partition.NoneShardSpec;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import org.joda.time.Interval;
@ -131,6 +132,18 @@ public class S3DataSegmentMoverTest
return (objects != null && objects.contains(objectKey));
}
@Override
public StorageObject getObjectDetails(String bucketName, String objectKey) throws ServiceException
{
if (isObjectInBucket(bucketName, objectKey)) {
final S3Object object = new S3Object(objectKey);
object.setStorageClass(S3Object.STORAGE_CLASS_STANDARD);
return object;
} else {
return null;
}
}
@Override
public Map<String, Object> moveObject(
String sourceBucketName,

View File

@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
@ -28,7 +29,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>
@ -57,6 +58,10 @@
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
@ -130,12 +135,12 @@
<artifactId>jetty-servlets</artifactId>
</dependency>
<dependency>
<groupId>com.ircclouds.irc</groupId>
<artifactId>irc-api</artifactId>
<groupId>com.ircclouds.irc</groupId>
<artifactId>irc-api</artifactId>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
</dependency>
<!-- Tests -->

View File

@ -32,10 +32,9 @@ import io.druid.client.selector.TierSelectorStrategy;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client;
import io.druid.query.DataSource;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.TableDataSource;
import io.druid.query.QueryWatcher;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
@ -60,6 +59,7 @@ public class BrokerServerView implements TimelineServerView
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
private final QueryToolChestWarehouse warehouse;
private final QueryWatcher queryWatcher;
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ServerInventoryView baseView;
@ -68,6 +68,7 @@ public class BrokerServerView implements TimelineServerView
@Inject
public BrokerServerView(
QueryToolChestWarehouse warehouse,
QueryWatcher queryWatcher,
ObjectMapper smileMapper,
@Client HttpClient httpClient,
ServerInventoryView baseView,
@ -75,6 +76,7 @@ public class BrokerServerView implements TimelineServerView
)
{
this.warehouse = warehouse;
this.queryWatcher = queryWatcher;
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.baseView = baseView;
@ -154,7 +156,7 @@ public class BrokerServerView implements TimelineServerView
private DirectDruidClient makeDirectClient(DruidServer server)
{
return new DirectDruidClient(warehouse, smileMapper, httpClient, server.getHost());
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost());
}
private QueryableDruidServer removeServer(DruidServer server)

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
@ -43,11 +44,15 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.handler.codec.http.HttpChunk;
@ -60,6 +65,7 @@ import java.io.InputStream;
import java.net.URL;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
@ -73,6 +79,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = Maps.newConcurrentMap();
private final QueryToolChestWarehouse warehouse;
private final QueryWatcher queryWatcher;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final String host;
@ -82,12 +89,14 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public DirectDruidClient(
QueryToolChestWarehouse warehouse,
QueryWatcher queryWatcher,
ObjectMapper objectMapper,
HttpClient httpClient,
String host
)
{
this.warehouse = warehouse;
this.queryWatcher = queryWatcher;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.host = host;
@ -102,7 +111,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query)
public Sequence<T> run(final Query<T> query)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = query.getContextBySegment(false);
@ -127,6 +136,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final ListenableFuture<InputStream> future;
final String url = String.format("http://%s/druid/v2/", host);
final String cancelUrl = String.format("http://%s/druid/v2/%s", host, query.getId());
try {
log.debug("Querying url[%s]", url);
@ -174,6 +184,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
}
);
queryWatcher.registerQuery(query, future);
openConnections.getAndIncrement();
Futures.addCallback(
future, new FutureCallback<InputStream>()
@ -188,6 +201,27 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
if (future.isCancelled()) {
// forward the cancellation to underlying queriable node
try {
StatusResponseHolder res = httpClient
.delete(new URL(cancelUrl))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
.go(new StatusResponseHandler(Charsets.UTF_8))
.get();
if (res.getStatus().getCode() >= 500) {
throw new RE(
"Error cancelling query[%s]: queriable node returned status[%d] [%s].",
res.getStatus().getCode(),
res.getStatus().getReasonPhrase()
);
}
}
catch (IOException | ExecutionException | InterruptedException e) {
Throwables.propagate(e);
}
}
}
}
);
@ -196,7 +230,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
throw Throwables.propagate(e);
}
Sequence<T> retVal = new BaseSequence<T, JsonParserIterator<T>>(
Sequence<T> retVal = new BaseSequence<>(
new BaseSequence.IteratorMaker<T, JsonParserIterator<T>>()
{
@Override
@ -283,21 +317,23 @@ public class DirectDruidClient<T> implements QueryRunner<T>
if (jp == null) {
try {
jp = objectMapper.getFactory().createParser(future.get());
if (jp.nextToken() != JsonToken.START_ARRAY) {
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException e = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw e;
}
else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException e) {
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(e, "Failure getting results from[%s]", url);
}
catch (InterruptedException e) {
throw new RE(e, "Failure getting results from[%s]", url);
}
catch (ExecutionException e) {
throw new RE(e, "Failure getting results from[%s]", url);
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
}
}

View File

@ -22,6 +22,7 @@ package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -30,7 +31,9 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.guice.annotations.Client;
import io.druid.query.Query;
import io.druid.server.QueryResource;
import io.druid.server.router.Router;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import javax.inject.Inject;
@ -68,7 +71,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
return openConnections.get();
}
public ListenableFuture<FinalType> post(
public ListenableFuture<FinalType> postQuery(
String url,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
@ -81,7 +84,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON)
.go(responseHandler);
openConnections.getAndIncrement();
@ -125,4 +128,19 @@ public class RoutingDruidClient<IntermediateType, FinalType>
throw Throwables.propagate(e);
}
}
public ListenableFuture<FinalType> delete(
String url,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
try {
return httpClient
.delete(new URL(url))
.go(responseHandler);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Binder;
import com.google.inject.multibindings.MapBinder;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryWatcher;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
@ -39,6 +40,7 @@ import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.server.QueryManager;
import java.util.Map;
@ -62,6 +64,12 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule
{
super.configure(binder);
binder.bind(QueryWatcher.class)
.to(QueryManager.class)
.in(LazySingleton.class);
binder.bind(QueryManager.class)
.in(LazySingleton.class);
final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder(
binder
);

View File

@ -21,7 +21,11 @@ package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -37,19 +41,20 @@ import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.UUID;
/**
@ -59,8 +64,6 @@ import java.util.UUID;
public class AsyncQueryForwardingServlet extends HttpServlet
{
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final String DISPATCHED = "dispatched";
private static final Joiner COMMA_JOIN = Joiner.on(",");
private final ObjectMapper jsonMapper;
@ -88,275 +91,161 @@ public class AsyncQueryForwardingServlet extends HttpServlet
}
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException
{
OutputStream out = null;
AsyncContext ctx = null;
try {
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getDefaultHost();
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
final AsyncContext asyncContext = req.startAsync(req, res);
asyncContext.start(
new Runnable()
{
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/json");
try {
ChannelBuffer buf = response.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final OutputStream obj = clientResponse.getObj();
try {
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
Throwable e
)
{
handleException(resp, asyncContext, e);
}
};
asyncContext.start(
new Runnable()
@Override
public void run()
{
@Override
public void run()
{
routingDruidClient.get(makeUrl(host, req), responseHandler);
try {
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
final String host = hostFinder.getDefaultHost();
routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
}
catch (Exception e) {
handleException(jsonMapper, asyncContext, e);
}
}
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
handleException(resp, ctx, e);
}
}
);
}
@Override
protected void doPost(
final HttpServletRequest req, final HttpServletResponse resp
) throws ServletException, IOException
protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
Query query = null;
String queryId;
final boolean isSmile = "application/smile".equals(req.getContentType());
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
OutputStream out = null;
AsyncContext ctx = null;
try {
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId();
if (queryId == null) {
queryId = UUID.randomUUID().toString();
query = query.withId(queryId);
}
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", query);
}
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getHost(query);
final Query theQuery = query;
final String theQueryId = queryId;
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
final AsyncContext asyncContext = req.startAsync(req, res);
asyncContext.start(
new Runnable()
{
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/x-javascript");
try {
ChannelBuffer buf = response.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final long requestTime = System.currentTimeMillis() - start;
log.debug("Request time: %d", requestTime);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(theQuery.getDataSource()))
.setUser4(theQuery.getType())
.setUser5(COMMA_JOIN.join(theQuery.getIntervals()))
.setUser6(String.valueOf(theQuery.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(theQueryId)
.setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
final OutputStream obj = clientResponse.getObj();
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
theQuery,
new QueryStats(ImmutableMap.<String, Object>of("request/time", requestTime, "success", true))
)
);
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
Throwable e
)
{
handleException(resp, asyncContext, e);
}
};
asyncContext.start(
new Runnable()
@Override
public void run()
{
@Override
public void run()
{
routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler);
try {
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
final String host = hostFinder.getDefaultHost();
routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
}
catch (Exception e) {
handleException(jsonMapper, asyncContext, e);
}
}
);
}
);
}
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
handleException(resp, ctx, e);
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
final AsyncContext asyncContext = req.startAsync(req, res);
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", e.toString()))
)
);
}
catch (Exception e2) {
log.error(e2, "Unable to log query [%s]!", query);
}
final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType());
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
log.makeAlert(e, "Exception handling request")
.addData("query", query)
.addData("peer", req.getRemoteAddr())
.emit();
}
Query inputQuery = null;
try {
inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
if (inputQuery.getId() == null) {
inputQuery = inputQuery.withId(UUID.randomUUID().toString());
}
final Query query = inputQuery;
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", inputQuery);
}
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(
asyncContext,
objectMapper
)
{
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final long requestTime = System.currentTimeMillis() - start;
log.debug("Request time: %d", requestTime);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(request.getRemoteAddr())
.setUser8(query.getId())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
request.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"request/time",
requestTime,
"success",
true
)
)
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return super.done(clientResponse);
}
};
routingDruidClient.postQuery(
makeUrl(hostFinder.getHost(inputQuery), request),
inputQuery,
responseHandler
);
}
catch (Exception e) {
handleException(objectMapper, asyncContext, e);
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
request.getRemoteAddr(),
inputQuery,
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", e.toString()))
)
);
}
catch (Exception logError) {
log.error(logError, "Unable to log query [%s]!", inputQuery);
}
log.makeAlert(e, "Exception handling request")
.addData("query", inputQuery)
.addData("peer", request.getRemoteAddr())
.emit();
}
}
}
);
}
private String makeUrl(final String host, final HttpServletRequest req)
@ -370,24 +259,126 @@ public class AsyncQueryForwardingServlet extends HttpServlet
return String.format("http://%s%s?%s", host, requestURI, queryString);
}
private static void handleException(HttpServletResponse resp, AsyncContext ctx, Throwable e)
private static void handleException(ObjectMapper objectMapper, AsyncContext asyncContext, Throwable exception)
{
try {
final ServletOutputStream out = resp.getOutputStream();
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
if (!response.isCommitted()) {
final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage();
response.resetBuffer();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
objectMapper.writeValue(
response.getOutputStream(),
ImmutableMap.of(
"error", errorMessage
)
);
}
response.flushBuffer();
}
catch (IOException e) {
Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
}
private static class PassthroughHttpResponseHandler implements HttpResponseHandler<OutputStream, OutputStream>
{
private final AsyncContext asyncContext;
private final ObjectMapper objectMapper;
private final OutputStream outputStream;
public PassthroughHttpResponseHandler(AsyncContext asyncContext, ObjectMapper objectMapper) throws IOException
{
this.asyncContext = asyncContext;
this.objectMapper = objectMapper;
this.outputStream = asyncContext.getResponse().getOutputStream();
}
protected void copyStatusHeaders(HttpResponse clientResponse)
{
final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(clientResponse.getStatus().getCode());
response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE));
FluentIterable.from(clientResponse.headers().entries())
.filter(new Predicate<Map.Entry<String, String>>()
{
@Override
public boolean apply(@Nullable Map.Entry<String, String> input)
{
return input.getKey().startsWith("X-Druid");
}
}
)
.transform(
new Function<Map.Entry<String, String>, Object>()
{
@Nullable
@Override
public Object apply(@Nullable Map.Entry<String, String> input)
{
response.setHeader(input.getKey(), input.getValue());
return null;
}
}
)
.allMatch(Predicates.alwaysTrue());
}
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse clientResponse)
{
copyStatusHeaders(clientResponse);
try {
ChannelBuffer buf = clientResponse.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
if (ctx != null) {
ctx.complete();
}
resp.flushBuffer();
return ClientResponse.finished(outputStream);
}
catch (IOException e1) {
Throwables.propagate(e1);
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
asyncContext.complete();
return ClientResponse.finished(clientResponse.getObj());
}
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
Throwable e
)
{
// throwing an exception here may cause resource leak
try {
handleException(objectMapper, asyncContext, e);
} catch(Exception err) {
log.error(err, "Unable to handle exception response");
}
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.query.Query;
import io.druid.query.QueryWatcher;
import java.util.Set;
public class QueryManager implements QueryWatcher
{
final SetMultimap<String, ListenableFuture> queries;
public QueryManager()
{
this.queries = Multimaps.synchronizedSetMultimap(
HashMultimap.<String, ListenableFuture>create()
);
}
public boolean cancelQuery(String id) {
Set<ListenableFuture> futures = queries.removeAll(id);
boolean success = true;
for (ListenableFuture future : futures) {
success = success && future.cancel(true);
}
return success;
}
public void registerQuery(Query query, final ListenableFuture future)
{
final String id = query.getId();
queries.put(id, future);
future.addListener(
new Runnable()
{
@Override
public void run()
{
queries.remove(id, future);
}
},
MoreExecutors.sameThreadExecutor()
);
}
}

View File

@ -19,16 +19,23 @@
package io.druid.server;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Accumulators;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingAccumulators;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
@ -36,6 +43,7 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.log.RequestLogger;
import org.joda.time.DateTime;
@ -43,10 +51,17 @@ import org.joda.time.DateTime;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@ -58,14 +73,16 @@ import java.util.UUID;
public class QueryResource
{
private static final EmittingLogger log = new EmittingLogger(QueryResource.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final Joiner COMMA_JOIN = Joiner.on(",");
public static final String APPLICATION_SMILE = "application/smile";
public static final String APPLICATION_JSON = "application/json";
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QuerySegmentWalker texasRanger;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryManager queryManager;
@Inject
public QueryResource(
@ -73,35 +90,49 @@ public class QueryResource
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
ServiceEmitter emitter,
RequestLogger requestLogger
RequestLogger requestLogger,
QueryManager queryManager
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.jsonMapper = jsonMapper.copy();
this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
this.smileMapper = smileMapper.copy();
this.smileMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
this.texasRanger = texasRanger;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.queryManager = queryManager;
}
@DELETE
@Path("{id}")
@Produces("application/json")
public Response getServer(@PathParam("id") String queryId)
{
queryManager.cancelQuery(queryId);
return Response.status(Response.Status.ACCEPTED).build();
}
@POST
@Produces("application/json")
public void doPost(
public Response doPost(
@Context HttpServletRequest req,
@Context HttpServletResponse resp
@Context final HttpServletResponse resp
) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
Query query = null;
byte[] requestQuery = null;
String queryId;
String queryId = null;
final boolean isSmile = "application/smile".equals(req.getContentType());
final boolean isSmile = APPLICATION_SMILE.equals(req.getContentType());
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
ObjectWriter jsonWriter = req.getParameter("pretty") == null
final ObjectWriter jsonWriter = req.getParameter("pretty") == null
? objectMapper.writer()
: objectMapper.writerWithDefaultPrettyPrinter();
OutputStream out = null;
try {
requestQuery = ByteStreams.toByteArray(req.getInputStream());
@ -116,45 +147,92 @@ public class QueryResource
log.debug("Got query [%s]", query);
}
Sequence<?> results = query.run(texasRanger);
Sequence results = query.run(texasRanger);
if (results == null) {
results = Sequences.empty();
}
resp.setStatus(200);
resp.setContentType("application/x-javascript");
try (
final Yielder yielder = results.toYielder(
null,
new YieldingAccumulator()
{
@Override
public Object accumulate(Object accumulated, Object in)
{
yield();
return in;
}
}
)
) {
long requestTime = System.currentTimeMillis() - start;
out = resp.getOutputStream();
jsonWriter.writeValue(out, results);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(queryId)
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
long requestTime = System.currentTimeMillis() - start;
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"request/time", requestTime,
"success", true
)
)
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(queryId)
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"request/time", requestTime,
"success", true
)
return Response
.ok(
new StreamingOutput()
{
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException
{
jsonWriter.writeValue(outputStream, yielder);
outputStream.close();
}
},
isSmile ? APPLICATION_JSON : APPLICATION_SMILE
)
.header("X-Druid-Query-Id", queryId)
.build();
}
}
catch (QueryInterruptedException e) {
try {
log.info("%s [%s]", e.getMessage(), queryId);
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
new QueryStats(ImmutableMap.<String, Object>of("success", false, "interrupted", true, "reason", e.toString()))
)
);
} catch (Exception e2) {
log.error(e2, "Unable to log query [%s]!", query);
}
return Response.serverError().entity(
jsonWriter.writeValueAsString(
ImmutableMap.of(
"error", e.getMessage()
)
)
);
).build();
}
catch (Exception e) {
final String queryString =
@ -164,20 +242,6 @@ public class QueryResource
log.warn(e, "Exception occurred on request [%s]", queryString);
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
try {
requestLogger.log(
new RequestLogLine(
@ -197,10 +261,14 @@ public class QueryResource
.addData("query", queryString)
.addData("peer", req.getRemoteAddr())
.emit();
}
finally {
resp.flushBuffer();
Closeables.closeQuietly(out);
return Response.serverError().entity(
jsonWriter.writeValueAsString(
ImmutableMap.of(
"error", e.getMessage() == null ? "null exception" : e.getMessage()
)
)
).build();
}
}
}

View File

@ -21,7 +21,6 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;

View File

@ -77,6 +77,7 @@ import org.joda.time.Duration;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -91,11 +92,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class DruidCoordinator
{
public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR";
private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
private final Object lock = new Object();
private final DruidCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
private final JacksonConfigManager configManager;
@ -111,7 +109,6 @@ public class DruidCoordinator
private final AtomicReference<LeaderLatch> leaderLatch;
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
private volatile boolean started = false;
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
@ -234,7 +231,6 @@ public class DruidCoordinator
return retVal;
}
public CountingMap<String> getSegmentAvailability()
{
final CountingMap<String> retVal = new CountingMap<>();
@ -253,43 +249,22 @@ public class DruidCoordinator
public Map<String, Double> getLoadStatus()
{
// find available segments
Map<String, Set<DataSegment>> availableSegments = Maps.newHashMap();
for (DataSegment dataSegment : getAvailableDataSegments()) {
Set<DataSegment> segments = availableSegments.get(dataSegment.getDataSource());
if (segments == null) {
segments = Sets.newHashSet();
availableSegments.put(dataSegment.getDataSource(), segments);
}
segments.add(dataSegment);
}
// find segments currently loaded
Map<String, Set<DataSegment>> segmentsInCluster = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
for (DruidDataSource druidDataSource : druidServer.getDataSources()) {
Set<DataSegment> segments = segmentsInCluster.get(druidDataSource.getName());
if (segments == null) {
segments = Sets.newHashSet();
segmentsInCluster.put(druidDataSource.getName(), segments);
}
segments.addAll(druidDataSource.getSegments());
}
}
// compare available segments with currently loaded
Map<String, Double> loadStatus = Maps.newHashMap();
for (Map.Entry<String, Set<DataSegment>> entry : availableSegments.entrySet()) {
String dataSource = entry.getKey();
Set<DataSegment> segmentsAvailable = entry.getValue();
Set<DataSegment> loadedSegments = segmentsInCluster.get(dataSource);
if (loadedSegments == null) {
loadedSegments = Sets.newHashSet();
for (DruidDataSource dataSource : databaseSegmentManager.getInventory()) {
final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
final int availableSegmentSize = segments.size();
// remove loaded segments
for (DruidServer druidServer : serverInventoryView.getInventory()) {
final DruidDataSource loadedView = druidServer.getDataSource(dataSource.getName());
if (loadedView != null) {
segments.removeAll(loadedView.getSegments());
}
}
Set<DataSegment> unloadedSegments = Sets.difference(segmentsAvailable, loadedSegments);
final int unloadedSegmentSize = segments.size();
loadStatus.put(
dataSource,
100 * ((double) (segmentsAvailable.size() - unloadedSegments.size()) / (double) segmentsAvailable.size())
dataSource.getName(),
100 * ((double) (availableSegmentSize - unloadedSegmentSize) / (double) availableSegmentSize)
);
}
@ -422,11 +397,27 @@ public class DruidCoordinator
}
}
public Set<DataSegment> getAvailableDataSegments()
public Set<DataSegment> getOrderedAvailableDataSegments()
{
Set<DataSegment> availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
Iterable<DataSegment> dataSegments = Iterables.concat(
Iterable<DataSegment> dataSegments = getAvailableDataSegments();
for (DataSegment dataSegment : dataSegments) {
if (dataSegment.getSize() < 0) {
log.makeAlert("No size on Segment, wtf?")
.addData("segment", dataSegment)
.emit();
}
availableSegments.add(dataSegment);
}
return availableSegments;
}
public Iterable<DataSegment> getAvailableDataSegments()
{
return Iterables.concat(
Iterables.transform(
databaseSegmentManager.getInventory(),
new Function<DruidDataSource, Iterable<DataSegment>>()
@ -439,17 +430,6 @@ public class DruidCoordinator
}
)
);
for (DataSegment dataSegment : dataSegments) {
if (dataSegment.getSize() < 0) {
log.makeAlert("No size on Segment, wtf?")
.addData("segment", dataSegment)
.emit();
}
availableSegments.add(dataSegment);
}
return availableSegments;
}
@LifecycleStart

View File

@ -41,7 +41,7 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
// Display info about all available segments
final Set<DataSegment> availableSegments = coordinator.getAvailableDataSegments();
final Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
if (log.isDebugEnabled()) {
log.debug("Available DataSegments");
for (DataSegment dataSegment : availableSegments) {

View File

@ -21,30 +21,37 @@ package io.druid.client;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@ -54,17 +61,10 @@ import java.util.List;
public class DirectDruidClientTest
{
private HttpClient httpClient;
@Before
public void setUp() throws Exception
{
httpClient = EasyMock.createMock(HttpClient.class);
}
@Test
public void testRun() throws Exception
{
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
@ -93,12 +93,14 @@ public class DirectDruidClientTest
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo"
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo2"
@ -149,4 +151,70 @@ public class DirectDruidClientTest
EasyMock.verify(httpClient);
}
@Test
public void testCancel() throws Exception
{
HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class);
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(
new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"))
).once();
ListenableFuture<Object> cancelledFuture = Futures.immediateCancelledFuture();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancelledFuture).once();
EasyMock.expect(httpClient.delete(EasyMock.<URL>anyObject()))
.andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete")))
.once();
SettableFuture<Object> cancellationFuture = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancellationFuture).once();
EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector(
new DataSegment(
"test",
new Interval("2013-01-01/2013-01-02"),
new DateTime("2013-01-01").toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0L
),
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo"
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServer(queryableDruidServer1);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client1.run(query);
Assert.assertEquals(0, client1.getNumOpenConnections());
QueryInterruptedException exception = null;
try {
Sequences.toList(results, Lists.newArrayList());
} catch(QueryInterruptedException e) {
exception = e;
}
Assert.assertNotNull(exception);
EasyMock.verify(httpClient);
}
}

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.121-SNAPSHOT</version>
<version>0.6.122-SNAPSHOT</version>
</parent>
<dependencies>
@ -101,10 +101,6 @@
<executions>
<execution>
<id>distro-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
<configuration>
<descriptors>
<descriptor>src/assembly/assembly.xml</descriptor>