Merge branch 'master' into igalDruid
2
build.sh
|
@ -30,4 +30,4 @@ echo "For examples, see: "
|
|||
echo " "
|
||||
ls -1 examples/*/*sh
|
||||
echo " "
|
||||
echo "See also http://druid.io/docs/0.6.72"
|
||||
echo "See also http://druid.io/docs/0.6.73"
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -20,7 +20,7 @@ io.druid.cli.Main server coordinator
|
|||
Rules
|
||||
-----
|
||||
|
||||
Segments are loaded and dropped from the cluster based on a set of rules. Rules indicate how segments should be assigned to different historical node tiers and how many replicants of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. The coordinator loads a set of rules from the database. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule
|
||||
Segments are loaded and dropped from the cluster based on a set of rules. Rules indicate how segments should be assigned to different historical node tiers and how many replicants of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. The coordinator loads a set of rules from the database. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule.
|
||||
|
||||
For more information on rules, see [Rule Configuration](Rule-Configuration.html).
|
||||
|
||||
|
@ -136,4 +136,4 @@ FAQ
|
|||
|
||||
No. If the Druid coordinator is not started up, no new segments will be loaded in the cluster and outdated segments will not be dropped. However, the coordinator node can be started up at any time, and after a configurable delay, will start running coordinator tasks.
|
||||
|
||||
This also means that if you have a working cluster and all of your coordinators die, the cluster will continue to function, it just won’t experience any changes to its data topology.
|
||||
This also means that if you have a working cluster and all of your coordinators die, the cluster will continue to function, it just won’t experience any changes to its data topology.
|
||||
|
|
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
|||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.72
|
||||
git checkout druid-0.6.73
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
layout: doc_page
|
||||
---
|
||||
# groupBy Queries
|
||||
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. Note: If you only want to do straight aggreagates for some time range, we highly recommend using [TimeseriesQueries](TimeseriesQuery.html) instead. The performance will be substantially better.
|
||||
These types of queries take a groupBy query object and return an array of JSON objects where each object represents a grouping asked for by the query. Note: If you only want to do straight aggregates for some time range, we highly recommend using [TimeseriesQueries](TimeseriesQuery.html) instead. The performance will be substantially better.
|
||||
An example groupBy query object is shown below:
|
||||
|
||||
``` json
|
||||
|
@ -87,4 +87,4 @@ To pull it all together, the above query would return *n\*m* data points, up to
|
|||
},
|
||||
...
|
||||
]
|
||||
```
|
||||
```
|
|
@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/indexer
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/worker
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73","io.druid.extensions:druid-kafka-seven:0.6.73"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
|||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.72"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.73"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/realtime
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73","io.druid.extensions:druid-kafka-seven:0.6.73"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -37,7 +37,7 @@ There are several main parts to a search query:
|
|||
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|
||||
|searchDimensions|The dimensions to run the search over. Excluding this means the search is run over all dimensions.|no|
|
||||
|query|See [SearchQuerySpec](SearchQuerySpec.html).|yes|
|
||||
|sort|How the results of the search should sorted. Two possible types here are "lexicographic" and "strlen".|yes|
|
||||
|sort|How the results of the search should be sorted. Two possible types here are "lexicographic" and "strlen".|yes|
|
||||
|context|An additional JSON Object which can be used to specify certain flags.|no|
|
||||
|
||||
The format of the result is:
|
||||
|
|
|
@ -15,7 +15,7 @@ Segment metadata queries return per segment information about:
|
|||
{
|
||||
"queryType":"segmentMetadata",
|
||||
"dataSource":"sample_datasource",
|
||||
"intervals":["2013-01-01/2014-01-01"],
|
||||
"intervals":["2013-01-01/2014-01-01"]
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ TopN queries return a sorted set of results for the values in a given dimension
|
|||
A topN query object looks like:
|
||||
|
||||
```json
|
||||
{
|
||||
"queryType": "topN",
|
||||
"dataSource": "sample_data",
|
||||
"dimension": "sample_dim",
|
||||
|
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.72
|
||||
cd druid-services-0.6.73
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
|||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.73","io.druid.extensions:druid-kafka-seven:0.6.73"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.72
|
||||
cd druid-services-0.6.73
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz.
|
||||
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz.
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
|
|
@ -31,20 +31,20 @@ We have more details about the general design of the system and why you might wa
|
|||
When Druid?
|
||||
----------
|
||||
|
||||
* You need to do interactive, fast, exploration of large amounts of data
|
||||
* You need analytics (not key value store)
|
||||
* You have a lot of data (10s of Billions of events added per day, 10s of TB of data added per day)
|
||||
* You want to do your analysis on data as it’s happening (realtime)
|
||||
* Your store needs to be always-on, 24x7x365 and years into the future.
|
||||
* You need to do interactive, fast, exploration on large amounts of data
|
||||
* You need analytics (not a key-value store)
|
||||
* You have a lot of data (10s of billions of events added per day, 10s of TB of data added per day)
|
||||
* You want to do your analysis on data as it’s happening (in real-time)
|
||||
* You need a data store that is always available, 24x7x365, and years into the future.
|
||||
|
||||
|
||||
Not Druid?
|
||||
----------
|
||||
|
||||
* The amount of data you have can easily be handled by MySql
|
||||
* Your querying for individual entries or doing lookups (Not Analytics)
|
||||
* Batch is good enough
|
||||
* Canned queries is good enough
|
||||
* The amount of data you have can easily be handled by MySQL
|
||||
* You're querying for individual entries or doing lookups (not analytics)
|
||||
* Batch ingestion is good enough
|
||||
* Canned queries are good enough
|
||||
* Downtime is no big deal
|
||||
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
|
|
@ -4,7 +4,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72","io.druid.extensions:druid-rabbitmq:0.6.72"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.73","io.druid.extensions:druid-kafka-seven:0.6.73","io.druid.extensions:druid-rabbitmq:0.6.73"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -70,7 +70,7 @@ public class HadoopIndexTask extends AbstractTask
|
|||
@JsonIgnore
|
||||
private final HadoopDruidIndexerSchema schema;
|
||||
@JsonIgnore
|
||||
private final String hadoopCoordinates;
|
||||
private final List<String> hadoopDependencyCoordinates;
|
||||
|
||||
/**
|
||||
* @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
||||
|
@ -86,7 +86,8 @@ public class HadoopIndexTask extends AbstractTask
|
|||
public HadoopIndexTask(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("config") HadoopDruidIndexerSchema schema,
|
||||
@JsonProperty("hadoopCoordinates") String hadoopCoordinates
|
||||
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
|
||||
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -100,7 +101,9 @@ public class HadoopIndexTask extends AbstractTask
|
|||
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
|
||||
|
||||
this.schema = schema;
|
||||
this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates);
|
||||
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
|
||||
hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates
|
||||
) : hadoopDependencyCoordinates;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -132,20 +135,16 @@ public class HadoopIndexTask extends AbstractTask
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getHadoopCoordinates()
|
||||
public List<String> getHadoopDependencyCoordinates()
|
||||
{
|
||||
return hadoopCoordinates;
|
||||
return hadoopDependencyCoordinates;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
// setup Hadoop
|
||||
final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, hadoopCoordinates
|
||||
);
|
||||
|
||||
final List<URL> extensionURLs = Lists.newArrayList();
|
||||
for (String coordinate : extensionsConfig.getCoordinates()) {
|
||||
|
@ -161,7 +160,12 @@ public class HadoopIndexTask extends AbstractTask
|
|||
final List<URL> driverURLs = Lists.newArrayList();
|
||||
driverURLs.addAll(nonHadoopURLs);
|
||||
// put hadoop dependencies last to avoid jets3t & apache.httpcore version conflicts
|
||||
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
|
||||
for (String hadoopDependencyCoordinate : hadoopDependencyCoordinates) {
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, hadoopDependencyCoordinate
|
||||
);
|
||||
driverURLs.addAll(Arrays.asList(((URLClassLoader) hadoopLoader).getURLs()));
|
||||
}
|
||||
|
||||
final URLClassLoader loader = new URLClassLoader(driverURLs.toArray(new URL[driverURLs.size()]), null);
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
|
@ -240,10 +244,10 @@ public class HadoopIndexTask extends AbstractTask
|
|||
String version = args[1];
|
||||
|
||||
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
|
||||
.readValue(
|
||||
schema,
|
||||
HadoopDruidIndexerSchema.class
|
||||
);
|
||||
.readValue(
|
||||
schema,
|
||||
HadoopDruidIndexerSchema.class
|
||||
);
|
||||
final HadoopDruidIndexerConfig config =
|
||||
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
|
||||
.withVersion(version)
|
||||
|
@ -269,10 +273,10 @@ public class HadoopIndexTask extends AbstractTask
|
|||
final String segmentOutputPath = args[2];
|
||||
|
||||
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
|
||||
.readValue(
|
||||
schema,
|
||||
HadoopDruidIndexerSchema.class
|
||||
);
|
||||
.readValue(
|
||||
schema,
|
||||
HadoopDruidIndexerSchema.class
|
||||
);
|
||||
final HadoopDruidIndexerConfig config =
|
||||
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
|
||||
.withWorkingPath(workingPath)
|
||||
|
|
|
@ -401,6 +401,7 @@ public class TaskSerdeTest
|
|||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
10
pom.xml
|
@ -23,14 +23,14 @@
|
|||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
|
||||
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
|
||||
<url>http://www.github.com/metamx/druid</url>
|
||||
<tag>druid-0.6.72-SNAPSHOT</tag>
|
||||
<tag>druid-0.6.73-SNAPSHOT</tag>
|
||||
</scm>
|
||||
|
||||
<prerequisites>
|
||||
|
@ -94,7 +94,7 @@
|
|||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>server-metrics</artifactId>
|
||||
<version>0.0.5</version>
|
||||
<version>0.0.9</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
@ -548,8 +548,8 @@
|
|||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<repositories>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -37,14 +37,14 @@ public abstract class BaseQuery<T> implements Query<T>
|
|||
{
|
||||
public static String QUERYID = "queryId";
|
||||
private final DataSource dataSource;
|
||||
private final Map<String, String> context;
|
||||
private final Map<String, Object> context;
|
||||
private final QuerySegmentSpec querySegmentSpec;
|
||||
private volatile Duration duration;
|
||||
|
||||
public BaseQuery(
|
||||
DataSource dataSource,
|
||||
QuerySegmentSpec querySegmentSpec,
|
||||
Map<String, String> context
|
||||
Map<String, Object> context
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(dataSource, "dataSource can't be null");
|
||||
|
@ -102,28 +102,28 @@ public abstract class BaseQuery<T> implements Query<T>
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
public Map<String, String> getContext()
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
return context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContextValue(String key)
|
||||
public <ContextType> ContextType getContextValue(String key)
|
||||
{
|
||||
return context == null ? null : context.get(key);
|
||||
return context == null ? null : (ContextType) context.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContextValue(String key, String defaultValue)
|
||||
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue)
|
||||
{
|
||||
String retVal = getContextValue(key);
|
||||
ContextType retVal = getContextValue(key);
|
||||
return retVal == null ? defaultValue : retVal;
|
||||
}
|
||||
|
||||
protected Map<String, String> computeOverridenContext(Map<String, String> overrides)
|
||||
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
|
||||
{
|
||||
Map<String, String> overridden = Maps.newTreeMap();
|
||||
final Map<String, String> context = getContext();
|
||||
Map<String, Object> overridden = Maps.newTreeMap();
|
||||
final Map<String, Object> context = getContext();
|
||||
if (context != null) {
|
||||
overridden.putAll(context);
|
||||
}
|
||||
|
@ -135,28 +135,41 @@ public abstract class BaseQuery<T> implements Query<T>
|
|||
@Override
|
||||
public String getId()
|
||||
{
|
||||
return getContextValue(QUERYID);
|
||||
return (String) getContextValue(QUERYID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query withId(String id)
|
||||
{
|
||||
return withOverriddenContext(ImmutableMap.of(QUERYID, id));
|
||||
return withOverriddenContext(ImmutableMap.<String, Object>of(QUERYID, id));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BaseQuery baseQuery = (BaseQuery) o;
|
||||
|
||||
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) return false;
|
||||
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) return false;
|
||||
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) return false;
|
||||
if (querySegmentSpec != null ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) : baseQuery.querySegmentSpec != null)
|
||||
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) {
|
||||
return false;
|
||||
}
|
||||
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) {
|
||||
return false;
|
||||
}
|
||||
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) {
|
||||
return false;
|
||||
}
|
||||
if (querySegmentSpec != null
|
||||
? !querySegmentSpec.equals(baseQuery.querySegmentSpec)
|
||||
: baseQuery.querySegmentSpec != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
|
|||
@SuppressWarnings("unchecked")
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
{
|
||||
if (Boolean.parseBoolean(query.getContextValue("bySegment"))) {
|
||||
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) {
|
||||
final Sequence<T> baseSequence = base.run(query);
|
||||
return new Sequence<T>()
|
||||
{
|
||||
|
|
|
@ -37,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
|
|||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
{
|
||||
if (Boolean.parseBoolean(query.getContextValue("bySegment"))) {
|
||||
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) {
|
||||
return baseRunner.run(query);
|
||||
}
|
||||
|
||||
|
|
|
@ -83,7 +83,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
|
|||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
{
|
||||
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
|
||||
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0"));
|
||||
|
||||
return new BaseSequence<T, Iterator<T>>(
|
||||
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
||||
|
|
|
@ -304,7 +304,7 @@ public class Druids
|
|||
private QueryGranularity granularity;
|
||||
private List<AggregatorFactory> aggregatorSpecs;
|
||||
private List<PostAggregator> postAggregatorSpecs;
|
||||
private Map<String, String> context;
|
||||
private Map<String, Object> context;
|
||||
|
||||
private TimeseriesQueryBuilder()
|
||||
{
|
||||
|
@ -384,7 +384,7 @@ public class Druids
|
|||
return postAggregatorSpecs;
|
||||
}
|
||||
|
||||
public Map<String, String> getContext()
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
return context;
|
||||
}
|
||||
|
@ -465,7 +465,7 @@ public class Druids
|
|||
return this;
|
||||
}
|
||||
|
||||
public TimeseriesQueryBuilder context(Map<String, String> c)
|
||||
public TimeseriesQueryBuilder context(Map<String, Object> c)
|
||||
{
|
||||
context = c;
|
||||
return this;
|
||||
|
@ -505,7 +505,7 @@ public class Druids
|
|||
private QuerySegmentSpec querySegmentSpec;
|
||||
private List<String> dimensions;
|
||||
private SearchQuerySpec querySpec;
|
||||
private Map<String, String> context;
|
||||
private Map<String, Object> context;
|
||||
|
||||
public SearchQueryBuilder()
|
||||
{
|
||||
|
@ -660,7 +660,7 @@ public class Druids
|
|||
return this;
|
||||
}
|
||||
|
||||
public SearchQueryBuilder context(Map<String, String> c)
|
||||
public SearchQueryBuilder context(Map<String, Object> c)
|
||||
{
|
||||
context = c;
|
||||
return this;
|
||||
|
@ -690,7 +690,7 @@ public class Druids
|
|||
{
|
||||
private DataSource dataSource;
|
||||
private QuerySegmentSpec querySegmentSpec;
|
||||
private Map<String, String> context;
|
||||
private Map<String, Object> context;
|
||||
|
||||
public TimeBoundaryQueryBuilder()
|
||||
{
|
||||
|
@ -746,7 +746,7 @@ public class Druids
|
|||
return this;
|
||||
}
|
||||
|
||||
public TimeBoundaryQueryBuilder context(Map<String, String> c)
|
||||
public TimeBoundaryQueryBuilder context(Map<String, Object> c)
|
||||
{
|
||||
context = c;
|
||||
return this;
|
||||
|
|
|
@ -48,7 +48,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
|
|||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
{
|
||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment"));
|
||||
final boolean isBySegment = Boolean.parseBoolean(query.<String>getContextValue("bySegment"));
|
||||
final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true"));
|
||||
if (shouldFinalize) {
|
||||
Function<T, T> finalizerFn;
|
||||
|
@ -100,7 +100,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
return Sequences.map(
|
||||
baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", "false"))),
|
||||
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", "false"))),
|
||||
finalizerFn
|
||||
);
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
|
|||
query,
|
||||
configSupplier.get()
|
||||
);
|
||||
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
|
||||
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0"));
|
||||
|
||||
if (Iterables.isEmpty(queryables)) {
|
||||
log.warn("No queryables found.");
|
||||
|
|
|
@ -70,11 +70,11 @@ public interface Query<T>
|
|||
|
||||
public Duration getDuration();
|
||||
|
||||
public String getContextValue(String key);
|
||||
public <ContextType> ContextType getContextValue(String key);
|
||||
|
||||
public String getContextValue(String key, String defaultValue);
|
||||
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);
|
||||
|
||||
public Query<T> withOverriddenContext(Map<String, String> contextOverride);
|
||||
public Query<T> withOverriddenContext(Map<String, Object> contextOverride);
|
||||
|
||||
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
@JsonProperty("having") HavingSpec havingSpec,
|
||||
@JsonProperty("limitSpec") LimitSpec limitSpec,
|
||||
@JsonProperty("orderBy") LimitSpec orderBySpec,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
@ -147,7 +147,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
HavingSpec havingSpec,
|
||||
LimitSpec orderBySpec,
|
||||
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn,
|
||||
Map<String, String> context
|
||||
Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
@ -222,7 +222,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
}
|
||||
|
||||
@Override
|
||||
public GroupByQuery withOverriddenContext(Map<String, String> contextOverride)
|
||||
public GroupByQuery withOverriddenContext(Map<String, Object> contextOverride)
|
||||
{
|
||||
return new GroupByQuery(
|
||||
getDataSource(),
|
||||
|
@ -268,7 +268,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
private List<PostAggregator> postAggregatorSpecs;
|
||||
private HavingSpec havingSpec;
|
||||
|
||||
private Map<String, String> context;
|
||||
private Map<String, Object> context;
|
||||
|
||||
private LimitSpec limitSpec = null;
|
||||
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
|
||||
|
@ -443,7 +443,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setContext(Map<String, String> context)
|
||||
public Builder setContext(Map<String, Object> context)
|
||||
{
|
||||
this.context = context;
|
||||
return this;
|
||||
|
|
|
@ -58,7 +58,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
{
|
||||
};
|
||||
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
|
||||
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
|
||||
private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(GROUP_BY_MERGE_KEY, "false");
|
||||
private final Supplier<GroupByQueryConfig> configSupplier;
|
||||
private GroupByQueryEngine engine; // For running the outer query around a subquery
|
||||
|
||||
|
@ -80,7 +80,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
@Override
|
||||
public Sequence<Row> run(Query<Row> input)
|
||||
{
|
||||
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
||||
if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
||||
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
|
||||
} else {
|
||||
return runner.run(input);
|
||||
|
|
|
@ -34,4 +34,16 @@ public class AllColumnIncluderator implements ColumnIncluderator
|
|||
{
|
||||
return ALL_CACHE_PREFIX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
return obj instanceof AllColumnIncluderator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return AllColumnIncluderator.class.hashCode();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,9 @@ package io.druid.query.metadata.metadata;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.query.BaseQuery;
|
||||
import io.druid.query.DataSource;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.spec.QuerySegmentSpec;
|
||||
|
@ -36,17 +38,18 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
|
||||
@JsonCreator
|
||||
public SegmentMetadataQuery(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("dataSource") DataSource dataSource,
|
||||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||
@JsonProperty("toInclude") ColumnIncluderator toInclude,
|
||||
@JsonProperty("merge") Boolean merge,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(new TableDataSource(dataSource), querySegmentSpec, context);
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
||||
this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
|
||||
this.merge = merge == null ? false : merge;
|
||||
Preconditions.checkArgument(dataSource instanceof TableDataSource, "SegmentMetadataQuery only supports table datasource");
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -74,10 +77,10 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Query<SegmentAnalysis> withOverriddenContext(Map<String, String> contextOverride)
|
||||
public Query<SegmentAnalysis> withOverriddenContext(Map<String, Object> contextOverride)
|
||||
{
|
||||
return new SegmentMetadataQuery(
|
||||
((TableDataSource)getDataSource()).getName(),
|
||||
getDataSource(),
|
||||
getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride)
|
||||
);
|
||||
}
|
||||
|
@ -86,7 +89,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
public Query<SegmentAnalysis> withQuerySegmentSpec(QuerySegmentSpec spec)
|
||||
{
|
||||
return new SegmentMetadataQuery(
|
||||
((TableDataSource)getDataSource()).getName(),
|
||||
getDataSource(),
|
||||
spec, toInclude, merge, getContext());
|
||||
}
|
||||
|
||||
|
|
|
@ -294,7 +294,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
|||
return runner.run(query);
|
||||
}
|
||||
|
||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
||||
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
|
||||
|
||||
return Sequences.map(
|
||||
runner.run(query.withLimit(config.getMaxSearchLimit())),
|
||||
|
|
|
@ -58,7 +58,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
|||
@JsonProperty("searchDimensions") List<String> dimensions,
|
||||
@JsonProperty("query") SearchQuerySpec querySpec,
|
||||
@JsonProperty("sort") SearchSortSpec sortSpec,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
@ -112,7 +112,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
|||
}
|
||||
|
||||
@Override
|
||||
public SearchQuery withOverriddenContext(Map<String, String> contextOverrides)
|
||||
public SearchQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||
{
|
||||
return new SearchQuery(
|
||||
getDataSource(),
|
||||
|
|
|
@ -53,7 +53,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
|||
@JsonProperty("dimensions") List<String> dimensions,
|
||||
@JsonProperty("metrics") List<String> metrics,
|
||||
@JsonProperty("pagingSpec") PagingSpec pagingSpec,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
@ -120,7 +120,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
|||
);
|
||||
}
|
||||
|
||||
public SelectQuery withOverriddenContext(Map<String, String> contextOverrides)
|
||||
public SelectQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||
{
|
||||
return new SelectQuery(
|
||||
getDataSource(),
|
||||
|
|
|
@ -54,7 +54,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
|
|||
public TimeBoundaryQuery(
|
||||
@JsonProperty("dataSource") DataSource dataSource,
|
||||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -78,7 +78,7 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
|
|||
}
|
||||
|
||||
@Override
|
||||
public TimeBoundaryQuery withOverriddenContext(Map<String, String> contextOverrides)
|
||||
public TimeBoundaryQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||
{
|
||||
return new TimeBoundaryQuery(
|
||||
getDataSource(),
|
||||
|
|
|
@ -55,7 +55,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
|||
@JsonProperty("granularity") QueryGranularity granularity,
|
||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
|
||||
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
@ -116,7 +116,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
|||
);
|
||||
}
|
||||
|
||||
public TimeseriesQuery withOverriddenContext(Map<String, String> contextOverrides)
|
||||
public TimeseriesQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||
{
|
||||
return new TimeseriesQuery(
|
||||
getDataSource(),
|
||||
|
|
|
@ -62,7 +62,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
@JsonProperty("granularity") QueryGranularity granularity,
|
||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
|
||||
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
|
||||
@JsonProperty("context") Map<String, String> context
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
@ -178,7 +178,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
);
|
||||
}
|
||||
|
||||
public TopNQuery withOverriddenContext(Map<String, String> contextOverrides)
|
||||
public TopNQuery withOverriddenContext(Map<String, Object> contextOverrides)
|
||||
{
|
||||
return new TopNQuery(
|
||||
getDataSource(),
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TopNQueryBuilder
|
|||
private QueryGranularity granularity;
|
||||
private List<AggregatorFactory> aggregatorSpecs;
|
||||
private List<PostAggregator> postAggregatorSpecs;
|
||||
private Map<String, String> context;
|
||||
private Map<String, Object> context;
|
||||
|
||||
public TopNQueryBuilder()
|
||||
{
|
||||
|
@ -130,7 +130,7 @@ public class TopNQueryBuilder
|
|||
return postAggregatorSpecs;
|
||||
}
|
||||
|
||||
public Map<String, String> getContext()
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
return context;
|
||||
}
|
||||
|
@ -290,7 +290,7 @@ public class TopNQueryBuilder
|
|||
return this;
|
||||
}
|
||||
|
||||
public TopNQueryBuilder context(Map<String, String> c)
|
||||
public TopNQueryBuilder context(Map<String, Object> c)
|
||||
{
|
||||
context = c;
|
||||
return this;
|
||||
|
|
|
@ -339,7 +339,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
return runner.run(query);
|
||||
}
|
||||
|
||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
||||
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
|
||||
|
||||
return Sequences.map(
|
||||
runner.run(query.withThreshold(minTopNThreshold)),
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.metadata;
|
|||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.query.LegacyDataSource;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
|
@ -98,7 +99,7 @@ public class SegmentAnalyzerTest
|
|||
);
|
||||
|
||||
final SegmentMetadataQuery query = new SegmentMetadataQuery(
|
||||
"test", QuerySegmentSpecs.create("2011/2012"), null, null, null
|
||||
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null
|
||||
);
|
||||
return Sequences.toList(query.run(runner), Lists.<SegmentAnalysis>newArrayList());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.metadata;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SegmentMetadataQueryTest
|
||||
{
|
||||
private ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
String queryStr = "{\n"
|
||||
+ " \"queryType\":\"segmentMetadata\",\n"
|
||||
+ " \"dataSource\":\"test_ds\",\n"
|
||||
+ " \"intervals\":[\"2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z\"]\n"
|
||||
+ "}";
|
||||
Query query = mapper.readValue(queryStr, Query.class);
|
||||
Assert.assertTrue(query instanceof SegmentMetadataQuery);
|
||||
Assert.assertEquals("test_ds", query.getDataSource().getName());
|
||||
Assert.assertEquals(new Interval("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0));
|
||||
|
||||
// test serialize and deserialize
|
||||
Assert.assertEquals(query, mapper.readValue(mapper.writeValueAsString(query), Query.class));
|
||||
|
||||
}
|
||||
}
|
|
@ -1,7 +1,18 @@
|
|||
all : druid.pdf
|
||||
all : druid
|
||||
|
||||
druid : druid.pdf
|
||||
|
||||
sigmod : sgmd0658-yang.pdf
|
||||
|
||||
zip : sgmd0658-yang.zip
|
||||
|
||||
%.zip : %.pdf
|
||||
@rm -f dummy.ps
|
||||
@echo 1234 > dummy.ps
|
||||
zip $@ $*.pdf $*.tex dummy.ps
|
||||
|
||||
clean :
|
||||
@rm -f *.aux *.bbl *.blg *.log
|
||||
@rm -f *.aux *.bbl *.blg *.out *.log dummy.ps *.zip
|
||||
|
||||
%.tex : %.bib
|
||||
|
||||
|
|
|
@ -1,4 +1,18 @@
|
|||
\documentclass{acm_proc_article-sp}
|
||||
\documentclass{sig-alternate-2013}
|
||||
|
||||
\newfont{\mycrnotice}{ptmr8t at 7pt}
|
||||
\newfont{\myconfname}{ptmri8t at 7pt}
|
||||
\let\crnotice\mycrnotice%
|
||||
\let\confname\myconfname%
|
||||
\permission{Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than the author(s) must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from permissions@acm.org.}
|
||||
\conferenceinfo{SIGMOD'14,}{June 22--27, 2014, Snowbird, UT, USA. \\
|
||||
{\mycrnotice{Copyright is held by the owner/author(s). Publication rights licensed to ACM.}}}
|
||||
\copyrightetc{ACM \the\acmcopyr}
|
||||
\crdata{978-1-4503-2376-5/14/06\ ...\$15.00.\\
|
||||
\href{http://dx.doi.org/10.1145/2588555.2595631}{http://dx.doi.org/10.1145/2588555.2595631}}
|
||||
\clubpenalty=10000
|
||||
\widowpenalty = 10000
|
||||
|
||||
\usepackage{graphicx}
|
||||
\usepackage{balance}
|
||||
\usepackage{fontspec}
|
||||
|
@ -7,28 +21,47 @@
|
|||
\graphicspath{{figures/}}
|
||||
\usepackage{enumitem}
|
||||
|
||||
\hyphenation{metamarkets nelson}
|
||||
\hyphenation{metamarkets nelson cheddar}
|
||||
|
||||
\begin{document}
|
||||
|
||||
% ****************** TITLE ****************************************
|
||||
|
||||
\title{Druid: A Real-time Analytical Data Store}
|
||||
\title{Druid}
|
||||
\subtitle{A Real-time Analytical Data Store}
|
||||
|
||||
% ****************** AUTHORS **************************************
|
||||
|
||||
\numberofauthors{6}
|
||||
\author{
|
||||
\alignauthor Fangjin Yang, Eric Tschetter, Xavier Léauté, Nelson Ray, Gian Merlino, Deep Ganguli\\
|
||||
\email{\{fangjin, cheddar, xavier, nelson, gian, deep\}@metamarkets.com}
|
||||
\alignauthor Fangjin Yang\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
% \affaddr{625 2nd St Suite 230}\\
|
||||
% \affaddr{San Francisco, CA 94107}\\
|
||||
\email{fangjin@metamarkets.com}
|
||||
\alignauthor Eric Tschetter\\
|
||||
\email{echeddar@gmail.com}
|
||||
\alignauthor Xavier Léauté\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{xavier@metamarkets.com}
|
||||
\and
|
||||
\alignauthor Nelson Ray\\
|
||||
\email{ncray86@gmail.com}
|
||||
\alignauthor Gian Merlino\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{gian@metamarkets.com}
|
||||
\alignauthor Deep Ganguli\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{deep@metamarkets.com}
|
||||
}
|
||||
\date{21 March 2013}
|
||||
|
||||
% ****************** AUTHORS **************************************
|
||||
|
||||
\maketitle
|
||||
|
||||
\begin{abstract}
|
||||
Druid is an open
|
||||
source\footnote{\href{https://github.com/metamx/druid}{https://github.com/metamx/druid}}
|
||||
Druid is an open source\footnote{\href{http://druid.io/}{http://druid.io/} \href{https://github.com/metamx/druid}{https://github.com/metamx/druid}}
|
||||
data store designed for real-time exploratory analytics on large data sets.
|
||||
The system combines a column-oriented storage layout, a distributed,
|
||||
shared-nothing architecture, and an advanced indexing structure to allow for
|
||||
|
@ -37,13 +70,19 @@ this paper, we describe Druid's architecture, and detail how it supports fast
|
|||
aggregations, flexible filters, and low latency data ingestion.
|
||||
\end{abstract}
|
||||
|
||||
% A category with the (minimum) three required fields
|
||||
\category{H.2.4}{Database Management}{Systems}[Distributed databases]
|
||||
% \category{D.2.8}{Software Engineering}{Metrics}[complexity measures, performance measures]
|
||||
\keywords{distributed; real-time; fault-tolerant; analytics; column-oriented; OLAP}
|
||||
|
||||
|
||||
\section{Introduction}
|
||||
In recent years, the proliferation of internet technology has
|
||||
created a surge in machine-generated events. Individually, these
|
||||
events contain minimal useful information and are of low value. Given the
|
||||
time and resources required to extract meaning from large collections of
|
||||
events, many companies were willing to discard this data instead. Although
|
||||
infrastructure has been built to handle event based data (e.g. IBM's
|
||||
infrastructure has been built to handle event-based data (e.g. IBM's
|
||||
Netezza\cite{singh2011introduction}, HP's Vertica\cite{bear2012vertica}, and EMC's
|
||||
Greenplum\cite{miner2012unified}), they are largely sold at high price points
|
||||
and are only targeted towards those companies who can afford the offering.
|
||||
|
@ -73,13 +112,12 @@ highly concurrent environment (1000+ users), Hadoop wasn't going to meet our
|
|||
needs. We explored different solutions in the space, and after
|
||||
trying both Relational Database Management Systems and NoSQL architectures, we
|
||||
came to the conclusion that there was nothing in the open source world that
|
||||
could be fully leveraged for our requirements.
|
||||
|
||||
We ended up creating Druid, an open-source, distributed, column-oriented,
|
||||
real-time analytical data store. In many ways, Druid shares similarities with
|
||||
other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
|
||||
could be fully leveraged for our requirements. We ended up creating Druid, an
|
||||
open-source, distributed, column-oriented, real-time analytical data store. In
|
||||
many ways, Druid shares similarities with other OLAP systems
|
||||
\cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
|
||||
interactive query systems \cite{melnik2010dremel}, main-memory databases
|
||||
\cite{farber2012sap}, and widely-known distributed data stores
|
||||
\cite{farber2012sap}, as well as widely known distributed data stores
|
||||
\cite{chang2008bigtable, decandia2007dynamo, lakshman2010cassandra}. The
|
||||
distribution and query model also borrow ideas from current generation search
|
||||
infrastructure \cite{linkedin2013senseidb, apache2013solr,
|
||||
|
@ -91,7 +129,6 @@ service, and attempts to help inform anyone who faces a similar problem about a
|
|||
potential method of solving it. Druid is deployed in production at several
|
||||
technology
|
||||
companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}.
|
||||
|
||||
The structure of the paper is as follows: we first describe the problem in
|
||||
Section \ref{sec:problem-definition}. Next, we detail system architecture from
|
||||
the point of view of how data flows through the system in Section
|
||||
|
@ -105,6 +142,21 @@ in Section \ref{sec:related}.
|
|||
\section{Problem Definition}
|
||||
\label{sec:problem-definition}
|
||||
|
||||
\begin{table*}
|
||||
\centering
|
||||
\begin{tabular}{| l | l | l | l | l | l | l | l |}
|
||||
\hline
|
||||
\textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline
|
||||
2011-01-01T01:00:00Z & Justin Bieber & Boxer & Male & San Francisco & 1800 & 25 \\ \hline
|
||||
2011-01-01T01:00:00Z & Justin Bieber & Reach & Male & Waterloo & 2912 & 42 \\ \hline
|
||||
2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline
|
||||
2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
|
||||
\end{tabular}
|
||||
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
|
||||
\label{tab:sample_data}
|
||||
\end{table*}
|
||||
|
||||
|
||||
Druid was originally designed to solve problems around ingesting and exploring
|
||||
large quantities of transactional events (log data). This form of timeseries
|
||||
data is commonly found in OLAP workflows and the nature of the data tends to be
|
||||
|
@ -120,20 +172,6 @@ there are a set of metric columns that contain values (usually numeric) that
|
|||
can be aggregated, such as the number of characters added or removed in an
|
||||
edit.
|
||||
|
||||
\begin{table*}
|
||||
\centering
|
||||
\begin{tabular}{| l | l | l | l | l | l | l | l |}
|
||||
\hline
|
||||
\textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline
|
||||
2011-01-01T01:00:00Z & Justin Bieber & Boxer & Male & San Francisco & 1800 & 25 \\ \hline
|
||||
2011-01-01T01:00:00Z & Justin Bieber & Reach & Male & Waterloo & 2912 & 42 \\ \hline
|
||||
2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline
|
||||
2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
|
||||
\end{tabular}
|
||||
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
|
||||
\label{tab:sample_data}
|
||||
\end{table*}
|
||||
|
||||
Our goal is to rapidly compute drill-downs and aggregates over this data. We
|
||||
want to answer questions like “How many edits were made on the page Justin
|
||||
Bieber from males in San Francisco?” and “What is the average number of
|
||||
|
@ -146,7 +184,7 @@ Relational Database Management Systems (RDBMS) and NoSQL key/value stores were
|
|||
unable to provide a low latency data ingestion and query platform for
|
||||
interactive applications \cite{tschetter2011druid}. In the early days of
|
||||
Metamarkets, we were focused on building a hosted dashboard that would allow
|
||||
users to arbitrary explore and visualize event streams. The data store
|
||||
users to arbitrarily explore and visualize event streams. The data store
|
||||
powering the dashboard needed to return queries fast enough that the data
|
||||
visualizations built on top of it could provide users with an interactive
|
||||
experience.
|
||||
|
@ -178,12 +216,13 @@ designed to perform a specific set of things. We believe this design separates
|
|||
concerns and simplifies the complexity of the system. The different node types
|
||||
operate fairly independent of each other and there is minimal interaction
|
||||
among them. Hence, intra-cluster communication failures have minimal impact
|
||||
on data availability. To solve complex data analysis problems, the different
|
||||
node types come together to form a fully working system. The name Druid comes
|
||||
from the Druid class in many role-playing games: it is a shape-shifter, capable
|
||||
of taking on many different forms to fulfill various different roles in a
|
||||
group. The composition of and flow of data in a Druid cluster are shown in
|
||||
Figure~\ref{fig:cluster}.
|
||||
on data availability.
|
||||
|
||||
To solve complex data analysis problems, the different
|
||||
node types come together to form a fully working system. The composition of and
|
||||
flow of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. The name Druid comes from the Druid class in many role-playing games: it is a
|
||||
shape-shifter, capable of taking on many different forms to fulfill various
|
||||
different roles in a group.
|
||||
|
||||
\begin{figure*}
|
||||
\centering
|
||||
|
@ -192,13 +231,12 @@ Figure~\ref{fig:cluster}.
|
|||
\label{fig:cluster}
|
||||
\end{figure*}
|
||||
|
||||
\newpage
|
||||
\subsection{Real-time Nodes}
|
||||
\label{sec:realtime}
|
||||
Real-time nodes encapsulate the functionality to ingest and query event
|
||||
streams. Events indexed via these nodes are immediately available for querying.
|
||||
The nodes are only concerned with events for some small time range and
|
||||
periodically hand off immutable batches of events they've collected over this
|
||||
periodically hand off immutable batches of events they have collected over this
|
||||
small time range to other nodes in the Druid cluster that are specialized in
|
||||
dealing with batches of immutable events. Real-time nodes leverage Zookeeper
|
||||
\cite{hunt2010zookeeper} for coordination with the rest of the Druid cluster.
|
||||
|
@ -220,10 +258,11 @@ in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}.
|
|||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.6in]{realtime_flow}
|
||||
\caption{Real-time nodes first buffer events in memory. On a periodic basis,
|
||||
the in-memory index is persisted to disk. On another periodic basis, all
|
||||
persisted indexes are merged together and handed off. Queries will hit the
|
||||
in-memory index and the persisted indexes.}
|
||||
\caption{Real-time nodes buffer events to an in-memory index, which is
|
||||
regularly persisted to disk. On a periodic basis, persisted indexes are then merged
|
||||
together before getting handed off.
|
||||
Queries will hit both the in-memory and persisted indexes.
|
||||
}
|
||||
\label{fig:realtime_flow}
|
||||
\end{figure}
|
||||
|
||||
|
@ -388,9 +427,7 @@ caching the results would be unreliable.
|
|||
\begin{figure*}
|
||||
\centering
|
||||
\includegraphics[width = 4.5in]{caching}
|
||||
\caption{Broker nodes cache per segment results. Every Druid query is mapped to
|
||||
a set of segments. Queries often combine cached segment results with those that
|
||||
need to be computed on historical and real-time nodes.}
|
||||
\caption{Results are cached per segment. Queries combine cached results with results computed on historical and real-time nodes.}
|
||||
\label{fig:caching}
|
||||
\end{figure*}
|
||||
|
||||
|
@ -762,7 +799,7 @@ involving all columns are very rare.
|
|||
|
||||
\begin{table}
|
||||
\centering
|
||||
\begin{tabular}{| l | l | l |}
|
||||
\scriptsize\begin{tabular}{| l | l | l |}
|
||||
\hline
|
||||
\textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline
|
||||
\texttt{a} & 25 & 21 \\ \hline
|
||||
|
@ -774,6 +811,7 @@ involving all columns are very rare.
|
|||
\texttt{g} & 26 & 18 \\ \hline
|
||||
\texttt{h} & 78 & 14 \\ \hline
|
||||
\end{tabular}
|
||||
\normalsize
|
||||
\caption{Characteristics of production data sources.}
|
||||
\label{tab:datasources}
|
||||
\end{table}
|
||||
|
@ -789,7 +827,7 @@ approximately 10TB of segments loaded. Collectively,
|
|||
there are about 50 billion Druid rows in this tier. Results for
|
||||
every data source are not shown.
|
||||
|
||||
\item The hot tier uses Xeon E5-2670 processors and consists of 1302 processing
|
||||
\item The hot tier uses Intel Xeon E5-2670 processors and consists of 1302 processing
|
||||
threads and 672 total cores (hyperthreaded).
|
||||
|
||||
\item A memory-mapped storage engine was used (the machine was configured to
|
||||
|
@ -828,7 +866,7 @@ comparison, we also provide the results of the same queries using MySQL using th
|
|||
MyISAM engine (InnoDB was slower in our experiments).
|
||||
|
||||
We selected MySQL to benchmark
|
||||
against because of its universal popularity. We choose not to select another
|
||||
against because of its universal popularity. We chose not to select another
|
||||
open source column store because we were not confident we could correctly tune
|
||||
it for optimal performance.
|
||||
|
||||
|
@ -871,6 +909,7 @@ well.
|
|||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.3in]{tpch_scaling}
|
||||
\includegraphics[width = 2.3in]{tpch_scaling_factor}
|
||||
\caption{Druid scaling benchmarks -- 100GB TPC-H data.}
|
||||
\label{fig:tpch_scaling}
|
||||
\end{figure}
|
||||
|
@ -933,9 +972,9 @@ running an Amazon \texttt{cc2.8xlarge} instance.
|
|||
\label{fig:ingestion_rate}
|
||||
\end{figure}
|
||||
|
||||
The latency measurements we presented are sufficient to address the our stated
|
||||
The latency measurements we presented are sufficient to address the stated
|
||||
problems of interactivity. We would prefer the variability in the latencies to
|
||||
be less. It is still very possible to possible to decrease latencies by adding
|
||||
be less. It is still very possible to decrease latencies by adding
|
||||
additional hardware, but we have not chosen to do so because infrastructure
|
||||
costs are still a consideration to us.
|
||||
|
||||
|
@ -1017,7 +1056,7 @@ data centers as well. The tier configuration in Druid coordinator nodes allow
|
|||
for segments to be replicated across multiple tiers. Hence, segments can be
|
||||
exactly replicated across historical nodes in multiple data centers.
|
||||
Similarily, query preference can be assigned to different tiers. It is possible
|
||||
to have nodes in one data center act as a primary cluster (and recieve all
|
||||
to have nodes in one data center act as a primary cluster (and receive all
|
||||
queries) and have a redundant cluster in another data center. Such a setup may
|
||||
be desired if one data center is situated much closer to users.
|
||||
|
||||
|
|
Before Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 53 KiB |
Before Width: | Height: | Size: 28 KiB |
Before Width: | Height: | Size: 51 KiB |
Before Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 36 KiB |
Before Width: | Height: | Size: 43 KiB |
|
@ -0,0 +1 @@
|
|||
druid.bib
|
|
@ -0,0 +1 @@
|
|||
druid.tex
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<version>0.6.75-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.MapCache;
|
||||
import io.druid.query.CacheStrategy;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
|
||||
private final String segmentIdentifier;
|
||||
private final SegmentDescriptor segmentDescriptor;
|
||||
private final QueryRunner<T> base;
|
||||
private final QueryToolChest toolChest;
|
||||
private final Cache cache;
|
||||
private final ObjectMapper mapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
|
||||
public CachePopulatingQueryRunner(
|
||||
String segmentIdentifier,
|
||||
SegmentDescriptor segmentDescriptor, ObjectMapper mapper,
|
||||
Cache cache,
|
||||
QueryToolChest toolchest,
|
||||
QueryRunner<T> base,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
this.base = base;
|
||||
this.segmentIdentifier = segmentIdentifier;
|
||||
this.segmentDescriptor = segmentDescriptor;
|
||||
this.toolChest = toolchest;
|
||||
this.cache = cache;
|
||||
this.mapper = mapper;
|
||||
this.cacheConfig = cacheConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
{
|
||||
|
||||
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
|
||||
|
||||
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
|
||||
&& strategy != null
|
||||
&& cacheConfig.isPopulateCache()
|
||||
// historical only populates distributed cache since the cache lookups are done at broker.
|
||||
&& !(cache instanceof MapCache) ;
|
||||
Sequence<T> results = base.run(query);
|
||||
if (populateCache) {
|
||||
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
|
||||
segmentIdentifier,
|
||||
segmentDescriptor,
|
||||
strategy.computeCacheKey(query)
|
||||
);
|
||||
CacheUtil.populate(
|
||||
cache,
|
||||
mapper,
|
||||
key,
|
||||
Sequences.toList(Sequences.map(results, strategy.prepareForCache()), new ArrayList())
|
||||
);
|
||||
}
|
||||
return results;
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
public class CacheUtil
|
||||
{
|
||||
public static Cache.NamedKey computeSegmentCacheKey(
|
||||
String segmentIdentifier,
|
||||
SegmentDescriptor descriptor,
|
||||
byte[] queryCacheKey
|
||||
)
|
||||
{
|
||||
final Interval segmentQueryInterval = descriptor.getInterval();
|
||||
final byte[] versionBytes = descriptor.getVersion().getBytes();
|
||||
|
||||
return new Cache.NamedKey(
|
||||
segmentIdentifier, ByteBuffer
|
||||
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
|
||||
.putLong(segmentQueryInterval.getStartMillis())
|
||||
.putLong(segmentQueryInterval.getEndMillis())
|
||||
.put(versionBytes)
|
||||
.putInt(descriptor.getPartitionNumber())
|
||||
.put(queryCacheKey).array()
|
||||
);
|
||||
}
|
||||
|
||||
public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key, Iterable<Object> results)
|
||||
{
|
||||
try {
|
||||
List<byte[]> bytes = Lists.newArrayList();
|
||||
int size = 0;
|
||||
for (Object result : results) {
|
||||
final byte[] array = mapper.writeValueAsBytes(result);
|
||||
size += array.length;
|
||||
bytes.add(array);
|
||||
}
|
||||
|
||||
byte[] valueBytes = new byte[size];
|
||||
int offset = 0;
|
||||
for (byte[] array : bytes) {
|
||||
System.arraycopy(array, 0, valueBytes, offset, array.length);
|
||||
offset += array.length;
|
||||
}
|
||||
|
||||
cache.put(key, valueBytes);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -40,6 +40,7 @@ import com.metamx.common.guava.Sequence;
|
|||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
|
@ -61,8 +62,8 @@ import io.druid.timeline.partition.PartitionChunk;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
@ -76,24 +77,26 @@ import java.util.concurrent.Executors;
|
|||
public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class);
|
||||
|
||||
private final QueryToolChestWarehouse warehouse;
|
||||
private final TimelineServerView serverView;
|
||||
private final Cache cache;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
|
||||
@Inject
|
||||
public CachingClusteredClient(
|
||||
QueryToolChestWarehouse warehouse,
|
||||
TimelineServerView serverView,
|
||||
Cache cache,
|
||||
@Smile ObjectMapper objectMapper
|
||||
@Smile ObjectMapper objectMapper,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
this.warehouse = warehouse;
|
||||
this.serverView = serverView;
|
||||
this.cache = cache;
|
||||
this.objectMapper = objectMapper;
|
||||
this.cacheConfig = cacheConfig;
|
||||
|
||||
serverView.registerSegmentCallback(
|
||||
Executors.newFixedThreadPool(
|
||||
|
@ -122,18 +125,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
|
||||
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
|
||||
|
||||
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null;
|
||||
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
|
||||
&& strategy != null;
|
||||
final boolean useCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.USE_CACHE, "true"))
|
||||
&& strategy != null
|
||||
&& cacheConfig.isUseCache();
|
||||
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
|
||||
&& strategy != null && cacheConfig.isPopulateCache();
|
||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
||||
|
||||
|
||||
ImmutableMap.Builder<String, String> contextBuilder = new ImmutableMap.Builder<>();
|
||||
ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
|
||||
|
||||
final String priority = query.getContextValue("priority", "0");
|
||||
contextBuilder.put("priority", priority);
|
||||
|
||||
if (populateCache) {
|
||||
contextBuilder.put(CacheConfig.POPULATE_CACHE, "false");
|
||||
contextBuilder.put("bySegment", "true");
|
||||
}
|
||||
contextBuilder.put("intermediate", "true");
|
||||
|
@ -180,7 +186,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
if (queryCacheKey != null) {
|
||||
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
|
||||
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
|
||||
final Cache.NamedKey segmentCacheKey = computeSegmentCacheKey(
|
||||
final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(
|
||||
segment.lhs.getSegment().getIdentifier(),
|
||||
segment.rhs,
|
||||
queryCacheKey
|
||||
|
@ -286,7 +292,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
objectMapper.getFactory().createParser(cachedResult),
|
||||
cacheObjectClazz
|
||||
);
|
||||
} catch (IOException e) {
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
@ -372,26 +379,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
);
|
||||
}
|
||||
|
||||
private Cache.NamedKey computeSegmentCacheKey(
|
||||
String segmentIdentifier,
|
||||
SegmentDescriptor descriptor,
|
||||
byte[] queryCacheKey
|
||||
)
|
||||
{
|
||||
final Interval segmentQueryInterval = descriptor.getInterval();
|
||||
final byte[] versionBytes = descriptor.getVersion().getBytes();
|
||||
|
||||
return new Cache.NamedKey(
|
||||
segmentIdentifier, ByteBuffer
|
||||
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
|
||||
.putLong(segmentQueryInterval.getStartMillis())
|
||||
.putLong(segmentQueryInterval.getEndMillis())
|
||||
.put(versionBytes)
|
||||
.putInt(descriptor.getPartitionNumber())
|
||||
.put(queryCacheKey).array()
|
||||
);
|
||||
}
|
||||
|
||||
private static class CachePopulator
|
||||
{
|
||||
private final Cache cache;
|
||||
|
@ -407,26 +394,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
|
||||
public void populate(Iterable<Object> results)
|
||||
{
|
||||
try {
|
||||
List<byte[]> bytes = Lists.newArrayList();
|
||||
int size = 0;
|
||||
for (Object result : results) {
|
||||
final byte[] array = mapper.writeValueAsBytes(result);
|
||||
size += array.length;
|
||||
bytes.add(array);
|
||||
}
|
||||
|
||||
byte[] valueBytes = new byte[size];
|
||||
int offset = 0;
|
||||
for (byte[] array : bytes) {
|
||||
System.arraycopy(array, 0, valueBytes, offset, array.length);
|
||||
offset += array.length;
|
||||
}
|
||||
|
||||
cache.put(key, valueBytes);
|
||||
} catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
CacheUtil.populate(cache, mapper, key, results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.cache;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class CacheConfig
|
||||
{
|
||||
public static String USE_CACHE = "useCache";
|
||||
public static String POPULATE_CACHE = "populateCache";
|
||||
@JsonProperty
|
||||
private boolean useCache = true;
|
||||
@JsonProperty
|
||||
private boolean populateCache = true;
|
||||
|
||||
public boolean isPopulateCache()
|
||||
{
|
||||
return populateCache;
|
||||
}
|
||||
|
||||
public boolean isUseCache()
|
||||
{
|
||||
return useCache;
|
||||
}
|
||||
}
|
|
@ -39,7 +39,6 @@ public class LocalCacheProvider implements CacheProvider
|
|||
@Min(0)
|
||||
private int logEvictionCount = 0;
|
||||
|
||||
|
||||
@Override
|
||||
public Cache get()
|
||||
{
|
||||
|
|
|
@ -54,7 +54,6 @@ public class MapCache implements Cache
|
|||
)
|
||||
{
|
||||
this.byteCountingLRUMap = byteCountingLRUMap;
|
||||
|
||||
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
|
||||
|
||||
namespaceId = Maps.newHashMap();
|
||||
|
|
|
@ -83,9 +83,7 @@ public class MemcachedCache implements Cache
|
|||
.build(),
|
||||
AddrUtil.getAddresses(config.getHosts())
|
||||
),
|
||||
config.getMemcachedPrefix(),
|
||||
config.getTimeout(),
|
||||
config.getExpiration()
|
||||
config
|
||||
);
|
||||
} catch(IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -103,15 +101,16 @@ public class MemcachedCache implements Cache
|
|||
private final AtomicLong timeoutCount = new AtomicLong(0);
|
||||
private final AtomicLong errorCount = new AtomicLong(0);
|
||||
|
||||
MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) {
|
||||
Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
|
||||
|
||||
MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) {
|
||||
Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
|
||||
"memcachedPrefix length [%d] exceeds maximum length [%d]",
|
||||
memcachedPrefix.length(),
|
||||
config.getMemcachedPrefix().length(),
|
||||
MAX_PREFIX_LENGTH);
|
||||
this.timeout = timeout;
|
||||
this.expiration = expiration;
|
||||
this.timeout = config.getTimeout();
|
||||
this.expiration = config.getExpiration();
|
||||
this.client = client;
|
||||
this.memcachedPrefix = memcachedPrefix;
|
||||
this.memcachedPrefix = config.getMemcachedPrefix();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -159,7 +159,7 @@ public class DatabaseRuleManager
|
|||
|
||||
this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");
|
||||
|
||||
createDefaultRule(dbi, getRulesTable(), config.get().getDefaultTier(), jsonMapper);
|
||||
createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper);
|
||||
ScheduledExecutors.scheduleWithFixedDelay(
|
||||
exec,
|
||||
new Duration(0),
|
||||
|
@ -274,8 +274,8 @@ public class DatabaseRuleManager
|
|||
if (theRules.get(dataSource) != null) {
|
||||
retVal.addAll(theRules.get(dataSource));
|
||||
}
|
||||
if (theRules.get(config.get().getDefaultTier()) != null) {
|
||||
retVal.addAll(theRules.get(config.get().getDefaultTier()));
|
||||
if (theRules.get(config.get().getDefaultRule()) != null) {
|
||||
retVal.addAll(theRules.get(config.get().getDefaultRule()));
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
|
|
@ -27,14 +27,14 @@ import org.joda.time.Period;
|
|||
public class DatabaseRuleManagerConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private String defaultTier = "_default";
|
||||
private String defaultRule = "_default";
|
||||
|
||||
@JsonProperty
|
||||
private Period pollDuration = new Period("PT1M");
|
||||
|
||||
public String getDefaultTier()
|
||||
public String getDefaultRule()
|
||||
{
|
||||
return defaultTier;
|
||||
return defaultRule;
|
||||
}
|
||||
|
||||
public Period getPollDuration()
|
||||
|
|
|
@ -64,7 +64,7 @@ public class DatabaseRuleManagerProvider implements Provider<DatabaseRuleManager
|
|||
{
|
||||
dbConnector.createRulesTable();
|
||||
DatabaseRuleManager.createDefaultRule(
|
||||
dbConnector.getDBI(), dbTables.get().getRulesTable(), config.get().getDefaultTier(), jsonMapper
|
||||
dbConnector.getDBI(), dbTables.get().getRulesTable(), config.get().getDefaultRule(), jsonMapper
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,6 @@ import io.druid.curator.CuratorModule;
|
|||
import io.druid.curator.discovery.DiscoveryModule;
|
||||
import io.druid.guice.AWSModule;
|
||||
import io.druid.guice.AnnouncerModule;
|
||||
import io.druid.guice.LocalDataStorageDruidModule;
|
||||
import io.druid.guice.DbConnectorModule;
|
||||
import io.druid.guice.DruidGuiceExtensions;
|
||||
import io.druid.guice.DruidProcessingModule;
|
||||
|
@ -47,6 +46,7 @@ import io.druid.guice.IndexingServiceDiscoveryModule;
|
|||
import io.druid.guice.JacksonConfigManagerModule;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LifecycleModule;
|
||||
import io.druid.guice.LocalDataStorageDruidModule;
|
||||
import io.druid.guice.QueryRunnerFactoryModule;
|
||||
import io.druid.guice.QueryableModule;
|
||||
import io.druid.guice.ServerModule;
|
||||
|
@ -107,9 +107,10 @@ public class Initialization
|
|||
/**
|
||||
* @param clazz Module class
|
||||
* @param <T>
|
||||
*
|
||||
* @return Returns the set of modules loaded.
|
||||
*/
|
||||
public static<T> Set<T> getLoadedModules(Class<T> clazz)
|
||||
public static <T> Set<T> getLoadedModules(Class<T> clazz)
|
||||
{
|
||||
Set<T> retVal = extensionsMap.get(clazz);
|
||||
if (retVal == null) {
|
||||
|
@ -190,22 +191,29 @@ public class Initialization
|
|||
)
|
||||
);
|
||||
|
||||
final List<Artifact> artifacts = aether.resolveArtifacts(dependencyRequest);
|
||||
List<URL> urls = Lists.newArrayListWithExpectedSize(artifacts.size());
|
||||
for (Artifact artifact : artifacts) {
|
||||
if (!exclusions.contains(artifact.getGroupId())) {
|
||||
urls.add(artifact.getFile().toURI().toURL());
|
||||
} else {
|
||||
log.debug("Skipped Artifact[%s]", artifact);
|
||||
try {
|
||||
final List<Artifact> artifacts = aether.resolveArtifacts(dependencyRequest);
|
||||
|
||||
List<URL> urls = Lists.newArrayListWithExpectedSize(artifacts.size());
|
||||
for (Artifact artifact : artifacts) {
|
||||
if (!exclusions.contains(artifact.getGroupId())) {
|
||||
urls.add(artifact.getFile().toURI().toURL());
|
||||
} else {
|
||||
log.debug("Skipped Artifact[%s]", artifact);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (URL url : urls) {
|
||||
log.info("Added URL[%s]", url);
|
||||
}
|
||||
for (URL url : urls) {
|
||||
log.info("Added URL[%s]", url);
|
||||
}
|
||||
|
||||
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
|
||||
loadersMap.put(coordinate, loader);
|
||||
loader = new URLClassLoader(urls.toArray(new URL[urls.size()]), Initialization.class.getClassLoader());
|
||||
loadersMap.put(coordinate, loader);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Unable to resolve artifacts for [%s].", dependencyRequest);
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
return loader;
|
||||
}
|
||||
|
@ -232,9 +240,9 @@ public class Initialization
|
|||
URI u = new URI(uri);
|
||||
Repository r = new Repository(uri);
|
||||
|
||||
if(u.getUserInfo() != null) {
|
||||
if (u.getUserInfo() != null) {
|
||||
String[] auth = u.getUserInfo().split(":", 2);
|
||||
if(auth.length == 2) {
|
||||
if (auth.length == 2) {
|
||||
r.setUsername(auth[0]);
|
||||
r.setPassword(auth[1]);
|
||||
} else {
|
||||
|
@ -247,7 +255,7 @@ public class Initialization
|
|||
}
|
||||
remoteRepositories.add(r);
|
||||
}
|
||||
catch(URISyntaxException e) {
|
||||
catch (URISyntaxException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
@ -261,28 +269,30 @@ public class Initialization
|
|||
|
||||
PrintStream oldOut = System.out;
|
||||
try {
|
||||
System.setOut(new PrintStream(
|
||||
new OutputStream()
|
||||
{
|
||||
@Override
|
||||
public void write(int b) throws IOException
|
||||
{
|
||||
System.setOut(
|
||||
new PrintStream(
|
||||
new OutputStream()
|
||||
{
|
||||
@Override
|
||||
public void write(int b) throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException
|
||||
{
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException
|
||||
{
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
));
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
return new DefaultTeslaAether(
|
||||
config.getLocalRepository(),
|
||||
remoteRepositories.toArray(new Repository[remoteRepositories.size()])
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.server.coordination;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.Ordering;
|
||||
|
@ -28,8 +29,12 @@ import com.metamx.common.guava.FunctionalIterable;
|
|||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.client.CachePopulatingQueryRunner;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.collections.CountingMap;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.query.BySegmentQueryRunner;
|
||||
import io.druid.query.DataSource;
|
||||
import io.druid.query.FinalizeResultsQueryRunner;
|
||||
|
@ -44,7 +49,6 @@ import io.druid.query.QueryToolChest;
|
|||
import io.druid.query.ReferenceCountingSegmentQueryRunner;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.spec.QuerySegmentSpec;
|
||||
import io.druid.query.spec.SpecificSegmentQueryRunner;
|
||||
import io.druid.query.spec.SpecificSegmentSpec;
|
||||
import io.druid.segment.ReferenceCountingSegment;
|
||||
|
@ -70,24 +74,27 @@ import java.util.concurrent.ExecutorService;
|
|||
public class ServerManager implements QuerySegmentWalker
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(ServerManager.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final SegmentLoader segmentLoader;
|
||||
private final QueryRunnerFactoryConglomerate conglomerate;
|
||||
private final ServiceEmitter emitter;
|
||||
private final ExecutorService exec;
|
||||
|
||||
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources;
|
||||
private final CountingMap<String> dataSourceSizes = new CountingMap<String>();
|
||||
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
|
||||
private final Cache cache;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
|
||||
@Inject
|
||||
public ServerManager(
|
||||
SegmentLoader segmentLoader,
|
||||
QueryRunnerFactoryConglomerate conglomerate,
|
||||
ServiceEmitter emitter,
|
||||
@Processing ExecutorService exec
|
||||
@Processing ExecutorService exec,
|
||||
@Smile ObjectMapper objectMapper,
|
||||
Cache cache,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
this.segmentLoader = segmentLoader;
|
||||
|
@ -95,8 +102,11 @@ public class ServerManager implements QuerySegmentWalker
|
|||
this.emitter = emitter;
|
||||
|
||||
this.exec = exec;
|
||||
this.cache = cache;
|
||||
this.objectMapper = objectMapper;
|
||||
|
||||
this.dataSources = new HashMap<>();
|
||||
this.cacheConfig = cacheConfig;
|
||||
}
|
||||
|
||||
public Map<String, Long> getDataSourceSizes()
|
||||
|
@ -122,7 +132,9 @@ public class ServerManager implements QuerySegmentWalker
|
|||
* Load a single segment.
|
||||
*
|
||||
* @param segment segment to load
|
||||
*
|
||||
* @return true if the segment was newly loaded, false if it was already loaded
|
||||
*
|
||||
* @throws SegmentLoadingException if the segment cannot be loaded
|
||||
*/
|
||||
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
|
||||
|
@ -130,10 +142,12 @@ public class ServerManager implements QuerySegmentWalker
|
|||
final Segment adapter;
|
||||
try {
|
||||
adapter = segmentLoader.getSegment(segment);
|
||||
} catch (SegmentLoadingException e) {
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
try {
|
||||
segmentLoader.cleanup(segment);
|
||||
} catch (SegmentLoadingException e1) {
|
||||
}
|
||||
catch (SegmentLoadingException e1) {
|
||||
// ignore
|
||||
}
|
||||
throw e;
|
||||
|
@ -205,11 +219,12 @@ public class ServerManager implements QuerySegmentWalker
|
|||
try {
|
||||
log.info("Attempting to close segment %s", segment.getIdentifier());
|
||||
oldQueryable.close();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Exception closing segment")
|
||||
.addData("dataSource", dataSource)
|
||||
.addData("segmentId", segment.getIdentifier())
|
||||
.emit();
|
||||
.addData("dataSource", dataSource)
|
||||
.addData("segmentId", segment.getIdentifier())
|
||||
.emit();
|
||||
}
|
||||
} else {
|
||||
log.info(
|
||||
|
@ -241,7 +256,8 @@ public class ServerManager implements QuerySegmentWalker
|
|||
String dataSourceName;
|
||||
try {
|
||||
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
|
||||
} catch (ClassCastException e) {
|
||||
}
|
||||
catch (ClassCastException e) {
|
||||
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
|
||||
}
|
||||
|
||||
|
@ -287,13 +303,12 @@ public class ServerManager implements QuerySegmentWalker
|
|||
factory,
|
||||
toolChest,
|
||||
input.getObject(),
|
||||
new SpecificSegmentSpec(
|
||||
new SegmentDescriptor(
|
||||
holder.getInterval(),
|
||||
holder.getVersion(),
|
||||
input.getChunkNumber()
|
||||
)
|
||||
new SegmentDescriptor(
|
||||
holder.getInterval(),
|
||||
holder.getVersion(),
|
||||
input.getChunkNumber()
|
||||
)
|
||||
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -316,8 +331,8 @@ public class ServerManager implements QuerySegmentWalker
|
|||
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
|
||||
if (factory == null) {
|
||||
log.makeAlert("Unknown query type, [%s]", query.getClass())
|
||||
.addData("dataSource", query.getDataSource())
|
||||
.emit();
|
||||
.addData("dataSource", query.getDataSource())
|
||||
.emit();
|
||||
return new NoopQueryRunner<T>();
|
||||
}
|
||||
|
||||
|
@ -326,7 +341,8 @@ public class ServerManager implements QuerySegmentWalker
|
|||
String dataSourceName;
|
||||
try {
|
||||
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
|
||||
} catch (ClassCastException e) {
|
||||
}
|
||||
catch (ClassCastException e) {
|
||||
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
|
||||
}
|
||||
|
||||
|
@ -360,7 +376,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
|
||||
final ReferenceCountingSegment adapter = chunk.getObject();
|
||||
return Arrays.asList(
|
||||
buildAndDecorateQueryRunner(factory, toolChest, adapter, new SpecificSegmentSpec(input))
|
||||
buildAndDecorateQueryRunner(factory, toolChest, adapter, input)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -376,9 +392,10 @@ public class ServerManager implements QuerySegmentWalker
|
|||
final QueryRunnerFactory<T, Query<T>> factory,
|
||||
final QueryToolChest<T, Query<T>> toolChest,
|
||||
final ReferenceCountingSegment adapter,
|
||||
final QuerySegmentSpec segmentSpec
|
||||
final SegmentDescriptor segmentDescriptor
|
||||
)
|
||||
{
|
||||
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
|
||||
return new SpecificSegmentQueryRunner<T>(
|
||||
new MetricsEmittingQueryRunner<T>(
|
||||
emitter,
|
||||
|
@ -393,7 +410,15 @@ public class ServerManager implements QuerySegmentWalker
|
|||
new BySegmentQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
adapter.getDataInterval().getStart(),
|
||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter)
|
||||
new CachePopulatingQueryRunner<T>(
|
||||
adapter.getIdentifier(),
|
||||
segmentDescriptor,
|
||||
objectMapper,
|
||||
cache,
|
||||
toolChest,
|
||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
|
||||
cacheConfig
|
||||
)
|
||||
)
|
||||
).withWaitMeasuredFromNow(),
|
||||
segmentSpec
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.server.coordinator.rules;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -39,4 +40,10 @@ public class ForeverDropRule extends DropRule
|
|||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -66,4 +67,10 @@ public class ForeverLoadRule extends LoadRule
|
|||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,12 @@ public class IntervalDropRule extends DropRule
|
|||
@Override
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||
{
|
||||
return interval.contains(segment.getInterval());
|
||||
return appliesTo(segment.getInterval(), referenceTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
|
||||
{
|
||||
return interval.contains(theInterval);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,7 +85,13 @@ public class IntervalLoadRule extends LoadRule
|
|||
@Override
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||
{
|
||||
return interval.contains(segment.getInterval());
|
||||
return appliesTo(segment.getInterval(), referenceTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
|
||||
{
|
||||
return interval.contains(theInterval);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -55,8 +55,14 @@ public class PeriodDropRule extends DropRule
|
|||
|
||||
@Override
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||
{
|
||||
return appliesTo(segment.getInterval(), referenceTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
|
||||
{
|
||||
final Interval currInterval = new Interval(period, referenceTimestamp);
|
||||
return currInterval.contains(segment.getInterval());
|
||||
return currInterval.contains(theInterval);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,8 +86,14 @@ public class PeriodLoadRule extends LoadRule
|
|||
|
||||
@Override
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
|
||||
{
|
||||
return appliesTo(segment.getInterval(), referenceTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
|
||||
{
|
||||
final Interval currInterval = new Interval(period, referenceTimestamp);
|
||||
return currInterval.overlaps(segment.getInterval());
|
||||
return currInterval.overlaps(interval);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import io.druid.server.coordinator.DruidCoordinator;
|
|||
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -37,13 +38,15 @@ import org.joda.time.DateTime;
|
|||
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
|
||||
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
|
||||
@JsonSubTypes.Type(name = "dropForever", value = ForeverDropRule.class)
|
||||
|
||||
})
|
||||
|
||||
public interface Rule
|
||||
{
|
||||
public String getType();
|
||||
|
||||
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp);
|
||||
|
||||
public boolean appliesTo(Interval interval, DateTime referenceTimestamp);
|
||||
|
||||
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.router;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.server.coordinator.rules.LoadRule;
|
||||
import io.druid.server.coordinator.rules.Rule;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BrokerSelector<T>
|
||||
{
|
||||
private static EmittingLogger log = new EmittingLogger(BrokerSelector.class);
|
||||
|
||||
private final CoordinatorRuleManager ruleManager;
|
||||
private final TierConfig tierConfig;
|
||||
private final ServerDiscoveryFactory serverDiscoveryFactory;
|
||||
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<String, ServerDiscoverySelector>();
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
@Inject
|
||||
public BrokerSelector(
|
||||
CoordinatorRuleManager ruleManager,
|
||||
TierConfig tierConfig,
|
||||
ServerDiscoveryFactory serverDiscoveryFactory
|
||||
)
|
||||
{
|
||||
this.ruleManager = ruleManager;
|
||||
this.tierConfig = tierConfig;
|
||||
this.serverDiscoveryFactory = serverDiscoveryFactory;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
|
||||
ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue());
|
||||
selector.start();
|
||||
selectorMap.put(entry.getValue(), selector);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (ServerDiscoverySelector selector : selectorMap.values()) {
|
||||
selector.stop();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
||||
public Pair<String, ServerDiscoverySelector> select(final Query<T> query)
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!ruleManager.isStarted() || !started) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
|
||||
|
||||
// find the rule that can apply to the entire set of intervals
|
||||
DateTime now = new DateTime();
|
||||
int lastRulePosition = -1;
|
||||
LoadRule baseRule = null;
|
||||
|
||||
for (Interval interval : query.getIntervals()) {
|
||||
int currRulePosition = 0;
|
||||
for (Rule rule : rules) {
|
||||
if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
|
||||
lastRulePosition = currRulePosition;
|
||||
baseRule = (LoadRule) rule;
|
||||
break;
|
||||
}
|
||||
currRulePosition++;
|
||||
}
|
||||
}
|
||||
|
||||
if (baseRule == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// in the baseRule, find the broker of highest priority
|
||||
String brokerServiceName = null;
|
||||
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
|
||||
if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
|
||||
brokerServiceName = entry.getValue();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (brokerServiceName == null) {
|
||||
log.makeAlert(
|
||||
"WTF?! No brokerServiceName found for datasource[%s], intervals[%s]. Using default[%s].",
|
||||
query.getDataSource(),
|
||||
query.getIntervals(),
|
||||
tierConfig.getDefaultBrokerServiceName()
|
||||
).emit();
|
||||
brokerServiceName = tierConfig.getDefaultBrokerServiceName();
|
||||
}
|
||||
|
||||
ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName);
|
||||
|
||||
if (retVal == null) {
|
||||
log.makeAlert(
|
||||
"WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]",
|
||||
brokerServiceName,
|
||||
tierConfig.getDefaultBrokerServiceName()
|
||||
).emit();
|
||||
retVal = selectorMap.get(tierConfig.getDefaultBrokerServiceName());
|
||||
}
|
||||
|
||||
return new Pair<>(brokerServiceName, retVal);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,193 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.router;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.util.Charsets;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.StatusResponseHandler;
|
||||
import com.metamx.http.client.response.StatusResponseHolder;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.server.coordinator.rules.Rule;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ManageLifecycle
|
||||
public class CoordinatorRuleManager
|
||||
{
|
||||
private static final Logger log = new Logger(CoordinatorRuleManager.class);
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final Supplier<TierConfig> config;
|
||||
private final ServerDiscoverySelector selector;
|
||||
|
||||
private final StatusResponseHandler responseHandler;
|
||||
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
|
||||
|
||||
private volatile ScheduledExecutorService exec;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
@Inject
|
||||
public CoordinatorRuleManager(
|
||||
@Global HttpClient httpClient,
|
||||
@Json ObjectMapper jsonMapper,
|
||||
Supplier<TierConfig> config,
|
||||
ServerDiscoverySelector selector
|
||||
)
|
||||
{
|
||||
this.httpClient = httpClient;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.config = config;
|
||||
this.selector = selector;
|
||||
|
||||
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
|
||||
this.rules = new AtomicReference<>(
|
||||
new ConcurrentHashMap<String, List<Rule>>()
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.exec = Execs.scheduledSingleThreaded("CoordinatorRuleManager-Exec--%d");
|
||||
|
||||
ScheduledExecutors.scheduleWithFixedDelay(
|
||||
exec,
|
||||
new Duration(0),
|
||||
config.get().getPollPeriod().toStandardDuration(),
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
poll();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
rules.set(new ConcurrentHashMap<String, List<Rule>>());
|
||||
|
||||
started = false;
|
||||
exec.shutdownNow();
|
||||
exec = null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isStarted()
|
||||
{
|
||||
return started;
|
||||
}
|
||||
|
||||
public void poll()
|
||||
{
|
||||
try {
|
||||
String url = getRuleURL();
|
||||
if (url == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
StatusResponseHolder response = httpClient.get(new URL(url))
|
||||
.go(responseHandler)
|
||||
.get();
|
||||
|
||||
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<String, List<Rule>>(
|
||||
(Map<String, List<Rule>>) jsonMapper.readValue(
|
||||
response.getContent(), new TypeReference<Map<String, List<Rule>>>()
|
||||
{
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
log.info("Got [%,d] rules", newRules.keySet().size());
|
||||
|
||||
rules.set(newRules);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception while polling for rules");
|
||||
}
|
||||
}
|
||||
|
||||
public List<Rule> getRulesWithDefault(final String dataSource)
|
||||
{
|
||||
List<Rule> retVal = Lists.newArrayList();
|
||||
Map<String, List<Rule>> theRules = rules.get();
|
||||
if (theRules.get(dataSource) != null) {
|
||||
retVal.addAll(theRules.get(dataSource));
|
||||
}
|
||||
if (theRules.get(config.get().getDefaultRule()) != null) {
|
||||
retVal.addAll(theRules.get(config.get().getDefaultRule()));
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private String getRuleURL()
|
||||
{
|
||||
Server server = selector.pick();
|
||||
|
||||
if (server == null) {
|
||||
log.error("No instances found for [%s]!", config.get().getCoordinatorServiceName());
|
||||
return null;
|
||||
}
|
||||
|
||||
return String.format("http://%s%s", server.getHost(), config.get().getRulesEndpoint());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.router;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RouterQuerySegmentWalker implements QuerySegmentWalker
|
||||
{
|
||||
private final QueryToolChestWarehouse warehouse;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final HttpClient httpClient;
|
||||
private final BrokerSelector brokerSelector;
|
||||
private final TierConfig tierConfig;
|
||||
|
||||
@Inject
|
||||
public RouterQuerySegmentWalker(
|
||||
QueryToolChestWarehouse warehouse,
|
||||
ObjectMapper objectMapper,
|
||||
@Global HttpClient httpClient,
|
||||
BrokerSelector brokerSelector,
|
||||
TierConfig tierConfig
|
||||
)
|
||||
{
|
||||
this.warehouse = warehouse;
|
||||
this.objectMapper = objectMapper;
|
||||
this.httpClient = httpClient;
|
||||
this.brokerSelector = brokerSelector;
|
||||
this.tierConfig = tierConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
|
||||
{
|
||||
return makeRunner();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
|
||||
{
|
||||
return makeRunner();
|
||||
}
|
||||
|
||||
private <T> QueryRunner<T> makeRunner()
|
||||
{
|
||||
return new TierAwareQueryRunner<T>(
|
||||
warehouse,
|
||||
objectMapper,
|
||||
httpClient,
|
||||
brokerSelector,
|
||||
tierConfig
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server.router;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.DirectDruidClient;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TierAwareQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class);
|
||||
|
||||
private final QueryToolChestWarehouse warehouse;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final HttpClient httpClient;
|
||||
private final BrokerSelector<T> brokerSelector;
|
||||
private final TierConfig tierConfig;
|
||||
|
||||
private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
|
||||
|
||||
public TierAwareQueryRunner(
|
||||
QueryToolChestWarehouse warehouse,
|
||||
ObjectMapper objectMapper,
|
||||
HttpClient httpClient,
|
||||
BrokerSelector<T> brokerSelector,
|
||||
TierConfig tierConfig
|
||||
)
|
||||
{
|
||||
this.warehouse = warehouse;
|
||||
this.objectMapper = objectMapper;
|
||||
this.httpClient = httpClient;
|
||||
this.brokerSelector = brokerSelector;
|
||||
this.tierConfig = tierConfig;
|
||||
}
|
||||
|
||||
public Server findServer(Query<T> query)
|
||||
{
|
||||
final Pair<String, ServerDiscoverySelector> selected = brokerSelector.select(query);
|
||||
final String brokerServiceName = selected.lhs;
|
||||
final ServerDiscoverySelector selector = selected.rhs;
|
||||
|
||||
Server server = selector.pick();
|
||||
if (server == null) {
|
||||
log.error(
|
||||
"WTF?! No server found for brokerServiceName[%s]. Using backup",
|
||||
brokerServiceName
|
||||
);
|
||||
|
||||
server = serverBackup.get(brokerServiceName);
|
||||
|
||||
if (server == null) {
|
||||
log.makeAlert(
|
||||
"WTF?! No backup found for brokerServiceName[%s]. Using default[%s]",
|
||||
brokerServiceName,
|
||||
tierConfig.getDefaultBrokerServiceName()
|
||||
).emit();
|
||||
|
||||
server = serverBackup.get(tierConfig.getDefaultBrokerServiceName());
|
||||
}
|
||||
} else {
|
||||
serverBackup.put(brokerServiceName, server);
|
||||
}
|
||||
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(Query<T> query)
|
||||
{
|
||||
Server server = findServer(query);
|
||||
|
||||
if (server == null) {
|
||||
log.makeAlert(
|
||||
"Catastrophic failure! No servers found at all! Failing request!"
|
||||
).emit();
|
||||
return Sequences.empty();
|
||||
}
|
||||
|
||||
QueryRunner<T> client = new DirectDruidClient<T>(
|
||||
warehouse,
|
||||
objectMapper,
|
||||
httpClient,
|
||||
server.getHost()
|
||||
);
|
||||
|
||||
return client.run(query);
|
||||
}
|
||||
}
|