Merge branch 'master' into optimize-postAgg-calculation

Conflicts:
	server/src/test/java/io/druid/client/CachingClusteredClientTest.java
This commit is contained in:
nishantmonu51 2014-04-08 03:35:56 +05:30
commit 22c3296aa3
57 changed files with 257 additions and 93 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " " echo " "
ls -1 examples/*/*sh ls -1 examples/*/*sh
echo " " echo " "
echo "See also http://druid.io/docs/0.6.81" echo "See also http://druid.io/docs/0.6.84"

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid git clone https://github.com/metamx/druid.git druid
cd druid cd druid
git fetch --tags git fetch --tags
git checkout druid-0.6.81 git checkout druid-0.6.84
./build.sh ./build.sh
``` ```
### Downloading the DSK (Druid Standalone Kit) ### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz) a stand-alone tarball and run it: [Download](http://static.druid.io/artifacts/releases/druid-services-0.6.84-bin.tar.gz) a stand-alone tarball and run it:
``` bash ``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/indexer druid.service=druid/prod/indexer
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.84"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod
@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/worker druid.service=druid/prod/worker
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81","io.druid.extensions:druid-kafka-seven:0.6.81"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.84","io.druid.extensions:druid-kafka-seven:0.6.84"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime druid.service=realtime
druid.port=8083 druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.81"] druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.84"]
druid.zk.service.host=localhost druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080 druid.port=8080
druid.service=druid/prod/realtime druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81","io.druid.extensions:druid-kafka-seven:0.6.81"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.84","io.druid.extensions:druid-kafka-seven:0.6.84"]
druid.zk.service.host=#{ZK_IPs} druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod druid.zk.paths.base=/druid/prod

View File

@ -51,12 +51,12 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|--------|-----------|---------| |--------|-----------|---------|
|type|The task type, this should always be "index".|yes| |type|The task type, this should always be "index".|yes|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no| |id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
|granularitySpec|Specifies the segment chunks that the task will process. `type` is always "uniform"; `gran` sets the granularity of the chunks ("DAY" means all segments containing timestamps in the same day, while `intervals` sets the interval that the chunks will cover.|yes| |granularitySpec|Specifies the segment chunks that the task will process. `type` is always "uniform"; `gran` sets the granularity of the chunks ("DAY" means all segments containing timestamps in the same day), while `intervals` sets the interval that the chunks will cover.|yes|
|spatialDimensions|Dimensions to build spatial indexes over. See [Geographic Queries](GeographicQueries.html).|no| |spatialDimensions|Dimensions to build spatial indexes over. See [Geographic Queries](GeographicQueries.html).|no|
|aggregators|The metrics to aggregate in the data set. For more info, see [Aggregations](Aggregations.html)|yes| |aggregators|The metrics to aggregate in the data set. For more info, see [Aggregations](Aggregations.html).|yes|
|indexGranularity|The rollup granularity for timestamps. See [Realtime Ingestion](Realtime-ingestion.html) for more information. |no| |indexGranularity|The rollup granularity for timestamps. See [Realtime Ingestion](Realtime-ingestion.html) for more information. |no|
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|no| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|no|
|firehose|The input source of data. For more info, see [Firehose](Firehose.html)|yes| |firehose|The input source of data. For more info, see [Firehose](Firehose.html).|yes|
|rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|no| |rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|no|
### Index Hadoop Task ### Index Hadoop Task

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball ### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz). Download this file to a directory of your choosing. We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.84-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.81 cd druid-services-0.6.84
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -136,7 +136,7 @@ Indexing the Data
To index the data and build a Druid segment, we are going to need to submit a task to the indexing service. This task should already exist: To index the data and build a Druid segment, we are going to need to submit a task to the indexing service. This task should already exist:
``` ```
examples/indexing/index_task.json examples/indexing/wikipedia_index_task.json
``` ```
Open up the file to see the following: Open up the file to see the following:

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz) You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.84-bin.tar.gz)
and untar the contents within by issuing: and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.84"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.81","io.druid.extensions:druid-kafka-seven:0.6.81"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.84","io.druid.extensions:druid-kafka-seven:0.6.84"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz) We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.84-bin.tar.gz)
Download this file to a directory of your choosing. Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.81 cd druid-services-0.6.84
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz. We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.84-bin.tar.gz.
Download this bad boy to a directory of your choosing. Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:

View File

@ -22,12 +22,6 @@ h2. Configuration
* "Broker":Broker-Config.html * "Broker":Broker-Config.html
* "Indexing Service":Indexing-Service-Config.html * "Indexing Service":Indexing-Service-Config.html
h2. Operations
* "Extending Druid":./Modules.html
* "Cluster Setup":./Cluster-setup.html
* "Booting a Production Cluster":./Booting-a-production-cluster.html
* "Performance FAQ":./Performance-FAQ.html
h2. Data Ingestion h2. Data Ingestion
* "Realtime":./Realtime-ingestion.html * "Realtime":./Realtime-ingestion.html
* "Batch":./Batch-ingestion.html * "Batch":./Batch-ingestion.html
@ -36,6 +30,12 @@ h2. Data Ingestion
* "Data Formats":./Data_formats.html * "Data Formats":./Data_formats.html
* "Ingestion FAQ":./Ingestion-FAQ.html * "Ingestion FAQ":./Ingestion-FAQ.html
h2. Operations
* "Extending Druid":./Modules.html
* "Cluster Setup":./Cluster-setup.html
* "Booting a Production Cluster":./Booting-a-production-cluster.html
* "Performance FAQ":./Performance-FAQ.html
h2. Querying h2. Querying
* "Querying":./Querying.html * "Querying":./Querying.html
** "Filters":./Filters.html ** "Filters":./Filters.html

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.84"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.81","io.druid.extensions:druid-kafka-seven:0.6.81","io.druid.extensions:druid-rabbitmq:0.6.81"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.84","io.druid.extensions:druid-kafka-seven:0.6.84","io.druid.extensions:druid-rabbitmq:0.6.84"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -23,14 +23,14 @@
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection> <connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection> <developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url> <url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.81-SNAPSHOT</tag> <tag>druid-0.6.84-SNAPSHOT</tag>
</scm> </scm>
<prerequisites> <prerequisites>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -120,6 +121,67 @@ public abstract class BaseQuery<T> implements Query<T>
return retVal == null ? defaultValue : retVal; return retVal == null ? defaultValue : retVal;
} }
@Override
public int getContextPriority(int defaultValue)
{
if (context == null) {
return defaultValue;
}
Object val = context.get("priority");
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Integer) {
return (int) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
@Override
public boolean getContextBySegment(boolean defaultValue)
{
return parseBoolean("bySegment", defaultValue);
}
@Override
public boolean getContextPopulateCache(boolean defaultValue)
{
return parseBoolean("populateCache", defaultValue);
}
@Override
public boolean getContextUseCache(boolean defaultValue)
{
return parseBoolean("useCache", defaultValue);
}
@Override
public boolean getContextFinalize(boolean defaultValue)
{
return parseBoolean("finalize", defaultValue);
}
private boolean parseBoolean(String key, boolean defaultValue)
{
if (context == null) {
return defaultValue;
}
Object val = context.get(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
}
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides) protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
{ {
Map<String, Object> overridden = Maps.newTreeMap(); Map<String, Object> overridden = Maps.newTreeMap();

View File

@ -53,7 +53,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) { if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query); final Sequence<T> baseSequence = base.run(query);
return new Sequence<T>() return new Sequence<T>()
{ {

View File

@ -37,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> run(Query<T> query) public Sequence<T> run(Query<T> query)
{ {
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) { if (query.getContextBySegment(false)) {
return baseRunner.run(query); return baseRunner.run(query);
} }

View File

@ -83,7 +83,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0")); final int priority = query.getContextPriority(0);
return new BaseSequence<T, Iterator<T>>( return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>() new BaseSequence.IteratorMaker<T, Iterator<T>>()

View File

@ -48,8 +48,8 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@Override @Override
public Sequence<T> run(final Query<T> query) public Sequence<T> run(final Query<T> query)
{ {
final boolean isBySegment = Boolean.parseBoolean(query.<String>getContextValue("bySegment")); final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true")); final boolean shouldFinalize = query.getContextFinalize(true);
if (shouldFinalize) { if (shouldFinalize) {
Function<T, T> finalizerFn; Function<T, T> finalizerFn;
if (isBySegment) { if (isBySegment) {
@ -84,8 +84,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
); );
} }
}; };
} } else {
else {
finalizerFn = toolChest.makeMetricManipulatorFn( finalizerFn = toolChest.makeMetricManipulatorFn(
query, query,
new MetricManipulationFn() new MetricManipulationFn()
@ -100,7 +99,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
} }
return Sequences.map( return Sequences.map(
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", "false"))), baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", false))),
finalizerFn finalizerFn
); );
} }

View File

@ -83,7 +83,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
query, query,
configSupplier.get() configSupplier.get()
); );
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0")); final int priority = query.getContextPriority(0);
if (Iterables.isEmpty(queryables)) { if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found."); log.warn("No queryables found.");

View File

@ -74,6 +74,13 @@ public interface Query<T>
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue); public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);
// For backwards compatibility
@Deprecated public int getContextPriority(int defaultValue);
@Deprecated public boolean getContextBySegment(boolean defaultValue);
@Deprecated public boolean getContextPopulateCache(boolean defaultValue);
@Deprecated public boolean getContextUseCache(boolean defaultValue);
@Deprecated public boolean getContextFinalize(boolean defaultValue);
public Query<T> withOverriddenContext(Map<String, Object> contextOverride); public Query<T> withOverriddenContext(Map<String, Object> contextOverride);
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec); public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);

View File

@ -182,7 +182,7 @@ public class GroupByQueryEngine
final DimensionSelector dimSelector = dims.get(0); final DimensionSelector dimSelector = dims.get(0);
final IndexedInts row = dimSelector.getRow(); final IndexedInts row = dimSelector.getRow();
if (row.size() == 0) { if (row == null || row.size() == 0) {
ByteBuffer newKey = key.duplicate(); ByteBuffer newKey = key.duplicate();
newKey.putInt(dimSelector.getValueCardinality()); newKey.putInt(dimSelector.getValueCardinality());
unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size())); unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size()));

View File

@ -294,7 +294,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
return runner.run(query); return runner.run(query);
} }
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false")); final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map( return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit())), runner.run(query.withLimit(config.getMaxSearchLimit())),

View File

@ -347,7 +347,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return runner.run(query); return runner.run(query);
} }
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false")); final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map( return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold)), runner.run(query.withThreshold(minTopNThreshold)),

View File

@ -22,6 +22,7 @@
package io.druid.query.timeboundary; package io.druid.query.timeboundary;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.Query; import io.druid.query.Query;
@ -47,4 +48,79 @@ public class TimeBoundaryQueryTest
Assert.assertEquals(query, serdeQuery); Assert.assertEquals(query, serdeQuery);
} }
@Test
public void testContextSerde() throws Exception
{
final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
.dataSource("foo")
.intervals("2013/2014")
.context(
ImmutableMap.<String, Object>of(
"priority",
1,
"useCache",
true,
"populateCache",
true,
"finalize",
true
)
).build();
final ObjectMapper mapper = new DefaultObjectMapper();
final TimeBoundaryQuery serdeQuery = mapper.readValue(
mapper.writeValueAsBytes(
mapper.readValue(
mapper.writeValueAsString(
query
), TimeBoundaryQuery.class
)
), TimeBoundaryQuery.class
);
Assert.assertEquals(1, serdeQuery.getContextValue("priority"));
Assert.assertEquals(true, serdeQuery.getContextValue("useCache"));
Assert.assertEquals(true, serdeQuery.getContextValue("populateCache"));
Assert.assertEquals(true, serdeQuery.getContextValue("finalize"));
}
@Test
public void testContextSerde2() throws Exception
{
final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
.dataSource("foo")
.intervals("2013/2014")
.context(
ImmutableMap.<String, Object>of(
"priority",
"1",
"useCache",
"true",
"populateCache",
"true",
"finalize",
"true"
)
).build();
final ObjectMapper mapper = new DefaultObjectMapper();
final TimeBoundaryQuery serdeQuery = mapper.readValue(
mapper.writeValueAsBytes(
mapper.readValue(
mapper.writeValueAsString(
query
), TimeBoundaryQuery.class
)
), TimeBoundaryQuery.class
);
Assert.assertEquals("1", serdeQuery.getContextValue("priority"));
Assert.assertEquals("true", serdeQuery.getContextValue("useCache"));
Assert.assertEquals("true", serdeQuery.getContextValue("populateCache"));
Assert.assertEquals("true", serdeQuery.getContextValue("finalize"));
}
} }

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -39,7 +39,7 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg
S3DataSegmentPusherConfig restoreConfig S3DataSegmentPusherConfig restoreConfig
) )
{ {
super(s3Client); super(s3Client, restoreConfig);
this.archiveConfig = archiveConfig; this.archiveConfig = archiveConfig;
this.restoreConfig = restoreConfig; this.restoreConfig = restoreConfig;
} }

View File

@ -30,6 +30,7 @@ import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException; import org.jets3t.service.ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object; import org.jets3t.service.model.S3Object;
@ -41,13 +42,16 @@ public class S3DataSegmentMover implements DataSegmentMover
private static final Logger log = new Logger(S3DataSegmentMover.class); private static final Logger log = new Logger(S3DataSegmentMover.class);
private final RestS3Service s3Client; private final RestS3Service s3Client;
private final S3DataSegmentPusherConfig config;
@Inject @Inject
public S3DataSegmentMover( public S3DataSegmentMover(
RestS3Service s3Client RestS3Service s3Client,
S3DataSegmentPusherConfig config
) )
{ {
this.s3Client = s3Client; this.s3Client = s3Client;
this.config = config;
} }
@Override @Override
@ -124,7 +128,11 @@ public class S3DataSegmentMover implements DataSegmentMover
targetS3Bucket, targetS3Bucket,
targetS3Path targetS3Path
); );
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, new S3Object(targetS3Path), false); final S3Object target = new S3Object(targetS3Path);
if(!config.getDisableAcl()) {
target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, target, false);
} }
} else { } else {
// ensure object exists in target location // ensure object exists in target location

View File

@ -89,7 +89,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
toPush.setBucketName(outputBucket); toPush.setBucketName(outputBucket);
toPush.setKey(s3Path); toPush.setKey(s3Path);
if (!config.getDisableAcl()) { if (!config.getDisableAcl()) {
toPush.setAcl(AccessControlList.REST_CANNED_AUTHENTICATED_READ); toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
} }
log.info("Pushing %s.", toPush); log.info("Pushing %s.", toPush);

View File

@ -62,7 +62,7 @@ public class S3DataSegmentMoverTest
public void testMove() throws Exception public void testMove() throws Exception
{ {
MockStorageService mockS3Client = new MockStorageService(); MockStorageService mockS3Client = new MockStorageService();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip")); mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"));
mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json")); mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"));
@ -82,7 +82,7 @@ public class S3DataSegmentMoverTest
public void testMoveNoop() throws Exception public void testMoveNoop() throws Exception
{ {
MockStorageService mockS3Client = new MockStorageService(); MockStorageService mockS3Client = new MockStorageService();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip")); mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"));
mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json")); mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"));
@ -103,7 +103,7 @@ public class S3DataSegmentMoverTest
public void testMoveException() throws Exception public void testMoveException() throws Exception
{ {
MockStorageService mockS3Client = new MockStorageService(); MockStorageService mockS3Client = new MockStorageService();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
mover.move( mover.move(
sourceSegment, sourceSegment,

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -70,7 +70,7 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
final CacheStrategy strategy = toolChest.getCacheStrategy(query); final CacheStrategy strategy = toolChest.getCacheStrategy(query);
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null && strategy != null
&& cacheConfig.isPopulateCache() && cacheConfig.isPopulateCache()
// historical only populates distributed cache since the cache lookups are done at broker. // historical only populates distributed cache since the cache lookups are done at broker.

View File

@ -62,7 +62,6 @@ import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -125,24 +124,24 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList(); final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap(); final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.USE_CACHE, "true")) final boolean useCache = query.getContextUseCache(true)
&& strategy != null && strategy != null
&& cacheConfig.isUseCache(); && cacheConfig.isUseCache();
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null && cacheConfig.isPopulateCache(); && strategy != null && cacheConfig.isPopulateCache();
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); final boolean isBySegment = query.getContextBySegment(false);
ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>(); ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
final String priority = query.getContextValue("priority", "0"); final int priority = query.getContextPriority(0);
contextBuilder.put("priority", priority); contextBuilder.put("priority", priority);
if (populateCache) { if (populateCache) {
contextBuilder.put(CacheConfig.POPULATE_CACHE, "false"); contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
contextBuilder.put("bySegment", "true"); contextBuilder.put("bySegment", true);
} }
contextBuilder.put("intermediate", "true"); contextBuilder.put("intermediate", true);
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());

View File

@ -106,7 +106,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public Sequence<T> run(Query<T> query) public Sequence<T> run(Query<T> query)
{ {
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query); QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); boolean isBySegment = query.getContextBySegment(false);
Pair<JavaType, JavaType> types = typesMap.get(query.getClass()); Pair<JavaType, JavaType> types = typesMap.get(query.getClass());
if (types == null) { if (types == null) {

View File

@ -68,13 +68,12 @@ public class RoutingDruidClient<IntermediateType, FinalType>
} }
public ListenableFuture<FinalType> run( public ListenableFuture<FinalType> run(
String host, String url,
Query query, Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler HttpResponseHandler<IntermediateType, FinalType> responseHandler
) )
{ {
final ListenableFuture<FinalType> future; final ListenableFuture<FinalType> future;
final String url = String.format("http://%s/druid/v2/", host);
try { try {
log.debug("Querying url[%s]", url); log.debug("Querying url[%s]", url);

View File

@ -117,6 +117,12 @@ public class RealtimePlumberSchool implements PlumberSchool
this.rejectionPolicyFactory = factory; this.rejectionPolicyFactory = factory;
} }
@JsonProperty("maxPendingPersists")
public void setDefaultMaxPendingPersists(int maxPendingPersists)
{
this.maxPendingPersists = maxPendingPersists;
}
public void setEmitter(ServiceEmitter emitter) public void setEmitter(ServiceEmitter emitter)
{ {
this.emitter = emitter; this.emitter = emitter;
@ -152,11 +158,6 @@ public class RealtimePlumberSchool implements PlumberSchool
this.queryExecutorService = executorService; this.queryExecutorService = executorService;
} }
public void setDefaultMaxPendingPersists(int maxPendingPersists)
{
this.maxPendingPersists = maxPendingPersists;
}
@Override @Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics) public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
{ {

View File

@ -106,8 +106,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
} }
req.setAttribute(DISPATCHED, true); req.setAttribute(DISPATCHED, true);
resp.setStatus(200);
resp.setContentType("application/x-javascript");
query = objectMapper.readValue(req.getInputStream(), Query.class); query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId(); queryId = query.getId();
@ -132,6 +130,9 @@ public class AsyncQueryForwardingServlet extends HttpServlet
@Override @Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response) public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{ {
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/x-javascript");
byte[] bytes = getContentBytes(response.getContent()); byte[] bytes = getContentBytes(response.getContent());
if (bytes.length > 0) { if (bytes.length > 0) {
try { try {
@ -209,7 +210,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
@Override @Override
public void run() public void run()
{ {
routingDruidClient.run(host, theQuery, responseHandler); routingDruidClient.run(makeUrl(host, req), theQuery, responseHandler);
} }
} }
); );
@ -235,4 +236,14 @@ public class AsyncQueryForwardingServlet extends HttpServlet
.emit(); .emit();
} }
} }
private String makeUrl(String host, HttpServletRequest req)
{
String queryString = req.getQueryString();
if (queryString == null) {
return String.format("http://%s%s", host, req.getRequestURI());
}
return String.format("http://%s%s?%s", host, req.getRequestURI(), req.getQueryString());
}
} }

View File

@ -44,7 +44,7 @@ public class QueryIDProvider
return String.format( return String.format(
"%s_%s_%s_%s_%s", "%s_%s_%s_%s_%s",
query.getDataSource(), query.getDataSource(),
query.getDuration(), query.getIntervals(),
host, host,
new DateTime(), new DateTime(),
id.incrementAndGet() id.incrementAndGet()

View File

@ -217,6 +217,7 @@ public class CachingClusteredClientTest
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching( testQueryCaching(
runner, runner,
builder.build(), builder.build(),
@ -280,6 +281,7 @@ public class CachingClusteredClientTest
.aggregators(AGGS) .aggregators(AGGS)
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching( testQueryCaching(
@ -786,11 +788,11 @@ public class CachingClusteredClientTest
for (Capture queryCapture : queryCaptures) { for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue(); Query capturedQuery = (Query) queryCapture.getValue();
if (expectBySegment) { if (expectBySegment) {
Assert.assertEquals("true", capturedQuery.getContextValue("bySegment")); Assert.assertEquals(true, capturedQuery.getContextValue("bySegment"));
} else { } else {
Assert.assertTrue( Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null || capturedQuery.getContextValue("bySegment") == null ||
capturedQuery.getContextValue("bySegment").equals("false") capturedQuery.getContextValue("bySegment").equals(false)
); );
} }
} }

View File

@ -27,7 +27,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.83-SNAPSHOT</version> <version>0.6.85-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -55,7 +55,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "broker", name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.81/Broker.html for a description" description = "Runs a broker node, see http://druid.io/docs/0.6.84/Broker.html for a description"
) )
public class CliBroker extends ServerRunnable public class CliBroker extends ServerRunnable
{ {

View File

@ -66,7 +66,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "coordinator", name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.81/Coordinator.html for a description." description = "Runs the Coordinator, see http://druid.io/docs/0.6.84/Coordinator.html for a description."
) )
public class CliCoordinator extends ServerRunnable public class CliCoordinator extends ServerRunnable
{ {

View File

@ -41,7 +41,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "hadoop", name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.81/Batch-ingestion.html for a description." description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.84/Batch-ingestion.html for a description."
) )
public class CliHadoopIndexer implements Runnable public class CliHadoopIndexer implements Runnable
{ {

View File

@ -46,7 +46,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "historical", name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.81/Historical.html for a description" description = "Runs a Historical node, see http://druid.io/docs/0.6.84/Historical.html for a description"
) )
public class CliHistorical extends ServerRunnable public class CliHistorical extends ServerRunnable
{ {

View File

@ -93,7 +93,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "overlord", name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.81/Indexing-Service.html for a description" description = "Runs an Overlord node, see http://druid.io/docs/0.6.84/Indexing-Service.html for a description"
) )
public class CliOverlord extends ServerRunnable public class CliOverlord extends ServerRunnable
{ {

View File

@ -30,7 +30,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "realtime", name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.81/Realtime.html for a description" description = "Runs a realtime node, see http://druid.io/docs/0.6.84/Realtime.html for a description"
) )
public class CliRealtime extends ServerRunnable public class CliRealtime extends ServerRunnable
{ {