mirror of https://github.com/apache/druid.git
Merge branch 'fix-test' of github.com:metamx/druid into fix-test
Conflicts: histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
This commit is contained in:
commit
2da7154fdf
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
|
||||
Best Practices
|
||||
==============
|
||||
|
||||
# Use UTC Timezone
|
||||
|
||||
We recommend using UTC timezone for all your events and across on your nodes, not just for Druid, but for all data infrastructure. This can greatly mitigate potential query problems with inconsistent timezones.
|
||||
|
||||
# Use Lowercase Strings for Column Names
|
||||
|
||||
Druid is not perfect in how it handles mix-cased dimension and metric names. This will hopefully change very soon but for the time being, lower casing your column names is recommended.
|
||||
|
||||
# SSDs
|
||||
|
||||
SSDs are highly recommended for historical and real-time nodes if you are not running a cluster that is entirely in memory. SSDs can greatly mitigate the time required to page data in and out of memory.
|
||||
|
||||
# Provide Columns Names in Lexicographic Order for Best Results
|
||||
|
||||
Although Druid supports schemaless ingestion of dimensions, because of https://github.com/metamx/druid/issues/658, you may sometimes get bigger segments than necessary. To ensure segments are as compact as possible, providing dimension names in lexicographic order is recommended. This may require some ETL processing on your data however.
|
||||
|
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
|||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.158
|
||||
git checkout druid-0.6.159
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.158-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
|
|
@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
|
|||
|
||||
- Update realtime node's configs for Kafka 8 extensions
|
||||
- e.g.
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.158",...]`
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.159",...]`
|
||||
- becomes
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.158",...]`
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.159",...]`
|
||||
- Update realtime task config for changed keys
|
||||
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/overlord
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/middlemanager
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.158","io.druid.extensions:druid-kafka-seven:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/historical
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
|||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/realtime
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.158","io.druid.extensions:druid-kafka-seven:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -51,7 +51,7 @@ druid.service=druid/prod/router
|
|||
|
||||
druid.extensions.remoteRepositories=[]
|
||||
druid.extensions.localRepository=lib
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-histogram:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-histogram:0.6.159"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -28,7 +28,7 @@ Configuration:
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.158"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -1,12 +0,0 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
|
||||
YourKit supports the Druid open source projects with its
|
||||
full-featured Java Profiler.
|
||||
YourKit, LLC is the creator of innovative and intelligent tools for profiling
|
||||
Java and .NET applications. Take a look at YourKit's software products:
|
||||
<a href="http://www.yourkit.com/java/profiler/index.jsp">YourKit Java
|
||||
Profiler</a> and
|
||||
<a href="http://www.yourkit.com/.net/profiler/index.jsp">YourKit .NET
|
||||
Profiler</a>.
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.158-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.158
|
||||
cd druid-services-0.6.159
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -91,7 +91,7 @@ druid.service=overlord
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
|
||||
|
||||
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
druid.db.connector.user=druid
|
||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
|||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.158-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.158","io.druid.extensions:druid-kafka-seven:0.6.158"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.158-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.158
|
||||
cd druid-services-0.6.159
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
|||
|
||||
# Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.158-bin.tar.gz).
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz).
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
|
|
@ -19,6 +19,7 @@ h2. Booting a Druid Cluster
|
|||
* "Production Cluster Configuration":Production-Cluster-Configuration.html
|
||||
* "Production Hadoop Configuration":Hadoop-Configuration.html
|
||||
* "Rolling Cluster Updates":Rolling-Updates.html
|
||||
* "Best Practices":Best-Practices.html
|
||||
|
||||
h2. Configuration
|
||||
* "Common Configuration":Configuration.html
|
||||
|
@ -93,4 +94,4 @@ h2. Development
|
|||
* "Libraries":./Libraries.html
|
||||
|
||||
h2. Misc
|
||||
* "Thanks":./Thanks.html
|
||||
* "Thanks":/thanks.html
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -30,12 +30,9 @@ import io.druid.data.input.Row;
|
|||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.MaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.MinAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.dimension.LegacyDimensionSpec;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryEngine;
|
||||
|
@ -45,7 +42,6 @@ import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
|
|||
import io.druid.query.groupby.orderby.DefaultLimitSpec;
|
||||
import io.druid.query.groupby.orderby.OrderByColumnSpec;
|
||||
import io.druid.segment.TestHelper;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -158,34 +154,33 @@ public class ApproximateHistogramGroupByQueryTest
|
|||
GroupByQuery query = new GroupByQuery.Builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||
.setDimensions(Arrays.<DimensionSpec>asList(new LegacyDimensionSpec(QueryRunnerTestHelper.marketDimension)))
|
||||
.setDimensions(
|
||||
Arrays.<DimensionSpec>asList(
|
||||
new DefaultDimensionSpec(
|
||||
QueryRunnerTestHelper.marketDimension,
|
||||
"marKetAlias"
|
||||
)
|
||||
)
|
||||
)
|
||||
.setInterval(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
Lists.newArrayList(
|
||||
new OrderByColumnSpec(
|
||||
QueryRunnerTestHelper.marketDimension,
|
||||
OrderByColumnSpec.Direction.ASCENDING
|
||||
"marKetAlias",
|
||||
OrderByColumnSpec.Direction.DESCENDING
|
||||
)
|
||||
), 1
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
Lists.newArrayList(
|
||||
Iterables.concat(
|
||||
QueryRunnerTestHelper.commonAggregators,
|
||||
Lists.newArrayList(
|
||||
new MaxAggregatorFactory("maxIndex", "index"),
|
||||
new MinAggregatorFactory("minIndex", "index"),
|
||||
aggFactory
|
||||
)
|
||||
)
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
aggFactory
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.addRowsIndexConstant,
|
||||
QueryRunnerTestHelper.dependentPostAgg,
|
||||
Arrays.<PostAggregator>asList(
|
||||
new QuantilePostAggregator("quantile", "apphisto", 0.5f)
|
||||
)
|
||||
)
|
||||
|
@ -194,31 +189,21 @@ public class ApproximateHistogramGroupByQueryTest
|
|||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow(
|
||||
"1970-01-01T00:00:00.000Z",
|
||||
"market", "spot",
|
||||
"rows", 837L,
|
||||
"addRowsIndexConstant", 96444.5703125,
|
||||
"dependentPostAgg", 97282.5703125,
|
||||
"index", 95606.5703125,
|
||||
"maxIndex", 277.2735290527344,
|
||||
"minIndex", 59.02102279663086,
|
||||
"quantile", 101.78856f,
|
||||
"uniques", QueryRunnerTestHelper.UNIQUES_9,
|
||||
"marketalias", "upfront",
|
||||
"rows", 186L,
|
||||
"quantile", 880.9881f,
|
||||
"apphisto",
|
||||
new Histogram(
|
||||
new float[]{
|
||||
4.457897186279297f,
|
||||
59.02102279663086f,
|
||||
113.58415222167969f,
|
||||
168.14727783203125f,
|
||||
222.7104034423828f,
|
||||
277.2735290527344f
|
||||
214.97299194335938f,
|
||||
545.9906005859375f,
|
||||
877.0081787109375f,
|
||||
1208.0257568359375f,
|
||||
1539.0433349609375f,
|
||||
1870.06103515625f
|
||||
},
|
||||
new double[]{
|
||||
0.0,
|
||||
462.4309997558594,
|
||||
357.5404968261719,
|
||||
15.022850036621094,
|
||||
2.0056631565093994
|
||||
0.0, 67.53287506103516, 72.22068786621094, 31.984678268432617, 14.261756896972656
|
||||
}
|
||||
)
|
||||
)
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -32,7 +32,7 @@ public class ImmutableZkWorker
|
|||
{
|
||||
private final Worker worker;
|
||||
private final int currCapacityUsed;
|
||||
private final Set<String> availabilityGroups;
|
||||
private final ImmutableSet<String> availabilityGroups;
|
||||
|
||||
public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set<String> availabilityGroups)
|
||||
{
|
||||
|
|
|
@ -164,7 +164,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
@Override
|
||||
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
|
||||
{
|
||||
Worker worker;
|
||||
final Worker worker;
|
||||
switch (event.getType()) {
|
||||
case CHILD_ADDED:
|
||||
worker = jsonMapper.readValue(
|
||||
|
@ -198,6 +198,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
}
|
||||
);
|
||||
break;
|
||||
case CHILD_UPDATED:
|
||||
worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
updateWorker(worker);
|
||||
break;
|
||||
|
||||
case CHILD_REMOVED:
|
||||
worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
|
@ -745,6 +753,24 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We allow workers to change their own capacities and versions. They cannot change their own hosts or ips without
|
||||
* dropping themselves and re-announcing.
|
||||
*/
|
||||
private void updateWorker(final Worker worker)
|
||||
{
|
||||
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
|
||||
if (zkWorker != null) {
|
||||
log.info("Worker[%s] updated its announcement from[%s] to[%s].", worker.getHost(), zkWorker.getWorker(), worker);
|
||||
zkWorker.setWorker(worker);
|
||||
} else {
|
||||
log.warn(
|
||||
"WTF, worker[%s] updated its announcement but we didn't have a ZkWorker for it. Ignoring.",
|
||||
worker.getHost()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When a ephemeral worker node disappears from ZK, incomplete running tasks will be retried by
|
||||
* the logic in the status listener. We still have to make sure there are no tasks assigned
|
||||
|
|
|
@ -22,11 +22,11 @@ package io.druid.indexing.overlord;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.worker.TaskAnnouncement;
|
||||
import io.druid.indexing.worker.Worker;
|
||||
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||
|
@ -46,15 +46,15 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
*/
|
||||
public class ZkWorker implements Closeable
|
||||
{
|
||||
private final Worker worker;
|
||||
private final PathChildrenCache statusCache;
|
||||
private final Function<ChildData, TaskAnnouncement> cacheConverter;
|
||||
|
||||
private AtomicReference<Worker> worker;
|
||||
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
|
||||
|
||||
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
|
||||
{
|
||||
this.worker = worker;
|
||||
this.worker = new AtomicReference<>(worker);
|
||||
this.statusCache = statusCache;
|
||||
this.cacheConverter = new Function<ChildData, TaskAnnouncement>()
|
||||
{
|
||||
|
@ -84,7 +84,7 @@ public class ZkWorker implements Closeable
|
|||
@JsonProperty("worker")
|
||||
public Worker getWorker()
|
||||
{
|
||||
return worker;
|
||||
return worker.get();
|
||||
}
|
||||
|
||||
@JsonProperty("runningTasks")
|
||||
|
@ -137,30 +137,28 @@ public class ZkWorker implements Closeable
|
|||
return getRunningTasks().containsKey(taskId);
|
||||
}
|
||||
|
||||
public boolean isAtCapacity()
|
||||
{
|
||||
return getCurrCapacityUsed() >= worker.getCapacity();
|
||||
}
|
||||
|
||||
public boolean isValidVersion(String minVersion)
|
||||
{
|
||||
return worker.getVersion().compareTo(minVersion) >= 0;
|
||||
return worker.get().getVersion().compareTo(minVersion) >= 0;
|
||||
}
|
||||
|
||||
public boolean canRunTask(Task task)
|
||||
public void setWorker(Worker newWorker)
|
||||
{
|
||||
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
|
||||
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
|
||||
final Worker oldWorker = worker.get();
|
||||
Preconditions.checkArgument(newWorker.getHost().equals(oldWorker.getHost()), "Cannot change Worker host");
|
||||
Preconditions.checkArgument(newWorker.getIp().equals(oldWorker.getIp()), "Cannot change Worker ip");
|
||||
|
||||
worker.set(newWorker);
|
||||
}
|
||||
|
||||
public void setLastCompletedTaskTime(DateTime completedTaskTime)
|
||||
{
|
||||
lastCompletedTaskTime.getAndSet(completedTaskTime);
|
||||
lastCompletedTaskTime.set(completedTaskTime);
|
||||
}
|
||||
|
||||
public ImmutableZkWorker toImmutable()
|
||||
{
|
||||
return new ImmutableZkWorker(worker, getCurrCapacityUsed(), getAvailabilityGroups());
|
||||
return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -361,6 +361,29 @@ public class RemoteTaskRunnerTest
|
|||
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWorkerDisabled() throws Exception
|
||||
{
|
||||
doSetup();
|
||||
final ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
|
||||
|
||||
Assert.assertTrue(taskAnnounced(task.getId()));
|
||||
mockWorkerRunningTask(task);
|
||||
Assert.assertTrue(workerRunningTask(task.getId()));
|
||||
|
||||
// Disable while task running
|
||||
disableWorker();
|
||||
|
||||
// Continue test
|
||||
mockWorkerCompleteSuccessfulTask(task);
|
||||
Assert.assertTrue(workerCompletedTask(result));
|
||||
Assert.assertEquals(task.getId(), result.get().getId());
|
||||
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
|
||||
|
||||
// Confirm RTR thinks the worker is disabled.
|
||||
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
|
||||
}
|
||||
|
||||
private void doSetup() throws Exception
|
||||
{
|
||||
makeWorker();
|
||||
|
@ -405,6 +428,14 @@ public class RemoteTaskRunnerTest
|
|||
);
|
||||
}
|
||||
|
||||
private void disableWorker() throws Exception
|
||||
{
|
||||
cf.setData().forPath(
|
||||
announcementsPath,
|
||||
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
|
||||
);
|
||||
}
|
||||
|
||||
private boolean taskAnnounced(final String taskId)
|
||||
{
|
||||
return pathExists(joiner.join(tasksPath, taskId));
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
@ -89,7 +89,7 @@
|
|||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>bytebuffer-collections</artifactId>
|
||||
<version>0.0.2</version>
|
||||
<version>0.0.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
|
@ -194,7 +194,7 @@
|
|||
<dependency>
|
||||
<groupId>it.uniroma3.mat</groupId>
|
||||
<artifactId>extendedset</artifactId>
|
||||
<version>1.3.4</version>
|
||||
<version>1.3.7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -63,12 +63,12 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
|
|||
public CardinalityAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldNames") final List<String> fieldNames,
|
||||
@JsonProperty("byRow") final Boolean byRow
|
||||
@JsonProperty("byRow") final boolean byRow
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.fieldNames = fieldNames;
|
||||
this.byRow = byRow == null ? false : byRow;
|
||||
this.byRow = byRow;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -203,6 +203,12 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
|
|||
return fieldNames;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isByRow()
|
||||
{
|
||||
return byRow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
|
|
|
@ -173,7 +173,7 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
public String apply(Row input)
|
||||
{
|
||||
// Multi-value dimensions have all been flattened at this point;
|
||||
final List<String> dimList = input.getDimension(dimension);
|
||||
final List<String> dimList = input.getDimension(dimension.toLowerCase());
|
||||
return dimList.isEmpty() ? null : dimList.get(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ public class QueryRunnerTestHelper
|
|||
);
|
||||
public static final QueryGranularity dayGran = QueryGranularity.DAY;
|
||||
public static final QueryGranularity allGran = QueryGranularity.ALL;
|
||||
public static final String marketDimension = "market";
|
||||
public static final String marketDimension = "marKet";
|
||||
public static final String qualityDimension = "quality";
|
||||
public static final String placementDimension = "placement";
|
||||
public static final String placementishDimension = "placementish";
|
||||
|
|
|
@ -19,16 +19,20 @@
|
|||
|
||||
package io.druid.query.aggregation.cardinality;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -378,4 +382,15 @@ public class CardinalityAggregatorTest
|
|||
0.05
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
CardinalityAggregatorFactory factory = new CardinalityAggregatorFactory("billy", ImmutableList.of("b", "a", "c"), true);
|
||||
ObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
Assert.assertEquals(
|
||||
factory,
|
||||
objectMapper.readValue(objectMapper.writeValueAsString(factory), AggregatorFactory.class)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -733,6 +733,48 @@ public class GroupByQueryRunnerTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithMixedCasingOrdering()
|
||||
{
|
||||
GroupByQuery query = new GroupByQuery.Builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||
.setDimensions(
|
||||
Arrays.<DimensionSpec>asList(
|
||||
new DefaultDimensionSpec(
|
||||
QueryRunnerTestHelper.marketDimension,
|
||||
"MarketAlias"
|
||||
)
|
||||
)
|
||||
)
|
||||
.setInterval(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
Lists.newArrayList(
|
||||
new OrderByColumnSpec(
|
||||
"marketALIAS",
|
||||
OrderByColumnSpec.Direction.DESCENDING
|
||||
)
|
||||
), 3
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
Lists.<AggregatorFactory>newArrayList(
|
||||
QueryRunnerTestHelper.rowsCount
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "marketalias", "upfront", "rows", 186L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "marketalias", "total_market", "rows", 186L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "marketalias", "spot", "rows", 837L)
|
||||
);
|
||||
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHavingSpec()
|
||||
{
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.159-SNAPSHOT</version>
|
||||
<version>0.6.160-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
Loading…
Reference in New Issue