merge changes from master

This commit is contained in:
nishantmonu51 2014-04-21 21:43:44 +05:30
commit f20d3b2895
30 changed files with 165 additions and 117 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.93
git checkout druid-0.6.95
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.93-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.95-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/indexer
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.93"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.95"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/worker
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.93","io.druid.extensions:druid-kafka-seven:0.6.93"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.95","io.druid.extensions:druid-kafka-seven:0.6.95"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.93"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.95"]
druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.93","io.druid.extensions:druid-kafka-seven:0.6.93"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.95","io.druid.extensions:druid-kafka-seven:0.6.95"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.93-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.95-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.93
cd druid-services-0.6.95
```
You should see a bunch of files:

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.93-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.95-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.93"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.95"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.93","io.druid.extensions:druid-kafka-seven:0.6.93"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.95","io.druid.extensions:druid-kafka-seven:0.6.95"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.93-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.95-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.93
cd druid-services-0.6.95
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.93-bin.tar.gz.
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.95-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.93"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.95"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.93","io.druid.extensions:druid-kafka-seven:0.6.93","io.druid.extensions:druid-rabbitmq:0.6.93"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.95","io.druid.extensions:druid-kafka-seven:0.6.95","io.druid.extensions:druid-rabbitmq:0.6.95"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,14 +23,14 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.93-SNAPSHOT</tag>
<tag>druid-0.6.95-SNAPSHOT</tag>
</scm>
<prerequisites>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -24,7 +24,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.FinalizeMetricManipulationFn;
import io.druid.query.aggregation.IdentityMetricManipulationFn;
import io.druid.query.aggregation.MetricManipulationFn;
import javax.annotation.Nullable;
@ -50,24 +51,25 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
{
final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true);
Function<T, T> finalizerFn;
final Query<T> queryToRun;
final Function<T, T> finalizerFn;
final MetricManipulationFn metricManipulationFn;
if (shouldFinalize) {
queryToRun = query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", false));
metricManipulationFn = new FinalizeMetricManipulationFn();
} else {
queryToRun = query;
metricManipulationFn = new IdentityMetricManipulationFn();
}
if (isBySegment) {
finalizerFn = new Function<T, T>()
{
final Function<T, T> baseFinalizer = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
if (shouldFinalize) {
return factory.finalizeComputation(factory.deserialize(object));
} else {
return object;
}
}
}
metricManipulationFn
);
@Override
@ -88,25 +90,12 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
};
} else {
finalizerFn = toolChest.makePostComputeManipulatorFn(
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
if (shouldFinalize) {
return factory.finalizeComputation(factory.deserialize(object));
} else {
return object;
}
}
}
);
finalizerFn = toolChest.makePostComputeManipulatorFn(query, metricManipulationFn);
}
return Sequences.map(
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", false))),
baseRunner.run(queryToRun),
finalizerFn
);

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
/**
*/
public class FinalizeMetricManipulationFn implements MetricManipulationFn
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return factory.finalizeComputation(object);
}
}

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
/**
*/
public class IdentityMetricManipulationFn implements MetricManipulationFn
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return object;
}
}

View File

@ -130,42 +130,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
.setUser9(Minutes.minutes(numMinutes).toString());
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePreComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn
)
{
return makeComputeManipulatorFn(query, fn, false);
}
private Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makeComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs
)
{
return new Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>>()
{
@Override
public Result<TimeseriesResultValue> apply(Result<TimeseriesResultValue> result)
{
final Map<String, Object> values = Maps.newHashMap();
final TimeseriesResultValue holder = result.getValue();
if (calculatePostAggs) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject()));
}
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
return new Result<TimeseriesResultValue>(
result.getTimestamp(),
new TimeseriesResultValue(values)
);
}
};
}
@Override
public TypeReference<Result<TimeseriesResultValue>> getResultTypeReference()
{
@ -273,6 +237,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
return Ordering.natural();
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePreComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn
)
{
return makeComputeManipulatorFn(query, fn, false);
}
@Override
public Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makePostComputeManipulatorFn(
TimeseriesQuery query, MetricManipulationFn fn
@ -281,5 +253,31 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
return makeComputeManipulatorFn(query, fn, true);
}
private Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> makeComputeManipulatorFn(
final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs
)
{
return new Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>>()
{
@Override
public Result<TimeseriesResultValue> apply(Result<TimeseriesResultValue> result)
{
final Map<String, Object> values = Maps.newHashMap();
final TimeseriesResultValue holder = result.getValue();
if (calculatePostAggs) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject()));
}
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
return new Result<TimeseriesResultValue>(
result.getTimestamp(),
new TimeseriesResultValue(values)
);
}
};
}
}

View File

@ -149,7 +149,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
private String dimension = query.getDimensionSpec().getOutputName();
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> result)
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
List<Map<String, Object>> serializedValues = Lists.newArrayList(
Iterables.transform(
@ -157,7 +157,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
new Function<DimensionAndMetricValueExtractor, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(@Nullable DimensionAndMetricValueExtractor input)
public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{
final Map<String, Object> values = Maps.newHashMap();
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
@ -197,7 +197,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
private String dimension = query.getDimensionSpec().getOutputName();
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> result)
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
List<Map<String, Object>> serializedValues = Lists.newArrayList(
Iterables.transform(
@ -205,7 +205,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
new Function<DimensionAndMetricValueExtractor, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(@Nullable DimensionAndMetricValueExtractor input)
public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{
final Map<String, Object> values = Maps.newHashMap();
// compute all post aggs
@ -249,7 +249,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@Override
public byte[] computeCacheKey(TopNQuery query)
@ -289,7 +288,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new Function<Result<TopNResultValue>, Object>()
{
@Override
public Object apply(@Nullable final Result<TopNResultValue> input)
public Object apply(final Result<TopNResultValue> input)
{
List<DimensionAndMetricValueExtractor> results = Lists.newArrayList(input.getValue());
final List<Object> retVal = Lists.newArrayListWithCapacity(results.size() + 1);
@ -317,7 +316,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
private final QueryGranularity granularity = query.getGranularity();
@Override
public Result<TopNResultValue> apply(@Nullable Object input)
public Result<TopNResultValue> apply(Object input)
{
List<Object> results = (List<Object>) input;
List<Map<String, Object>> retVal = Lists.newArrayListWithCapacity(results.size());
@ -418,7 +417,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> input)
public Result<TopNResultValue> apply(Result<TopNResultValue> input)
{
return new Result<TopNResultValue>(
input.getTimestamp(),

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -122,8 +122,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final JavaType typeRef;
if (isBySegment) {
typeRef = types.rhs;
}
else {
} else {
typeRef = types.lhs;
}
@ -219,14 +218,15 @@ public class DirectDruidClient<T> implements QueryRunner<T>
retVal = Sequences.map(
retVal,
toolChest.makePreComputeManipulatorFn(
query, new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return factory.deserialize(object);
}
}
query,
new MetricManipulationFn()
{
@Override
public Object manipulate(AggregatorFactory factory, Object object)
{
return factory.deserialize(object);
}
}
)
);
}
@ -313,7 +313,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override
public void close() throws IOException
{
if(jp != null) {
if (jp != null) {
jp.close();
}
}

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.95-SNAPSHOT</version>
<version>0.6.96-SNAPSHOT</version>
</parent>
<dependencies>