mirror of https://github.com/apache/druid.git
second attempt to fix merge-conflicts
This commit is contained in:
commit
a5d02b9030
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -155,8 +155,10 @@ Druid storage nodes maintain information about segments they have already downlo
|
||||||
|--------|-----------|-------|
|
|--------|-----------|-------|
|
||||||
|`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. | none (no caching) |
|
|`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. | none (no caching) |
|
||||||
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true|
|
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true|
|
||||||
|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|5 minutes|
|
|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)|
|
||||||
|`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
|
|`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
|
||||||
|
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|
||||||
|
|`druid.segmentCache.numLoadingThreads`|How many segments to load concurrently from from deep storage.|1|
|
||||||
|
|
||||||
### Jetty Server Module
|
### Jetty Server Module
|
||||||
|
|
||||||
|
|
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
||||||
git clone https://github.com/metamx/druid.git druid
|
git clone https://github.com/metamx/druid.git druid
|
||||||
cd druid
|
cd druid
|
||||||
git fetch --tags
|
git fetch --tags
|
||||||
git checkout druid-0.6.154
|
git checkout druid-0.6.156
|
||||||
./build.sh
|
./build.sh
|
||||||
```
|
```
|
||||||
|
|
||||||
### Downloading the DSK (Druid Standalone Kit)
|
### Downloading the DSK (Druid Standalone Kit)
|
||||||
|
|
||||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.154-bin.tar.gz) a stand-alone tarball and run it:
|
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz) a stand-alone tarball and run it:
|
||||||
|
|
||||||
``` bash
|
``` bash
|
||||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||||
|
|
|
@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
|
||||||
|
|
||||||
- Update realtime node's configs for Kafka 8 extensions
|
- Update realtime node's configs for Kafka 8 extensions
|
||||||
- e.g.
|
- e.g.
|
||||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.154",...]`
|
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.156",...]`
|
||||||
- becomes
|
- becomes
|
||||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.154",...]`
|
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.156",...]`
|
||||||
- Update realtime task config for changed keys
|
- Update realtime task config for changed keys
|
||||||
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
|
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
|
||||||
druid.port=8080
|
druid.port=8080
|
||||||
druid.service=druid/prod/overlord
|
druid.service=druid/prod/overlord
|
||||||
|
|
||||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.154"]
|
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156"]
|
||||||
|
|
||||||
druid.zk.service.host=#{ZK_IPs}
|
druid.zk.service.host=#{ZK_IPs}
|
||||||
druid.zk.paths.base=/druid/prod
|
druid.zk.paths.base=/druid/prod
|
||||||
|
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
|
||||||
druid.port=8080
|
druid.port=8080
|
||||||
druid.service=druid/prod/middlemanager
|
druid.service=druid/prod/middlemanager
|
||||||
|
|
||||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.154","io.druid.extensions:druid-kafka-seven:0.6.154"]
|
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156","io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||||
|
|
||||||
druid.zk.service.host=#{ZK_IPs}
|
druid.zk.service.host=#{ZK_IPs}
|
||||||
druid.zk.paths.base=/druid/prod
|
druid.zk.paths.base=/druid/prod
|
||||||
|
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
|
||||||
druid.port=8080
|
druid.port=8080
|
||||||
druid.service=druid/prod/historical
|
druid.service=druid/prod/historical
|
||||||
|
|
||||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.154"]
|
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156"]
|
||||||
|
|
||||||
druid.zk.service.host=#{ZK_IPs}
|
druid.zk.service.host=#{ZK_IPs}
|
||||||
druid.zk.paths.base=/druid/prod
|
druid.zk.paths.base=/druid/prod
|
||||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
||||||
druid.service=realtime
|
druid.service=realtime
|
||||||
druid.port=8083
|
druid.port=8083
|
||||||
|
|
||||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.154"]
|
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||||
|
|
||||||
|
|
||||||
druid.zk.service.host=localhost
|
druid.zk.service.host=localhost
|
||||||
|
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
||||||
druid.port=8080
|
druid.port=8080
|
||||||
druid.service=druid/prod/realtime
|
druid.service=druid/prod/realtime
|
||||||
|
|
||||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.154","io.druid.extensions:druid-kafka-seven:0.6.154"]
|
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156","io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||||
|
|
||||||
druid.zk.service.host=#{ZK_IPs}
|
druid.zk.service.host=#{ZK_IPs}
|
||||||
druid.zk.paths.base=/druid/prod
|
druid.zk.paths.base=/druid/prod
|
||||||
|
|
|
@ -25,6 +25,7 @@ There are several main parts to a select query:
|
||||||
|queryType|This String should always be "select"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|
|queryType|This String should always be "select"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|
||||||
|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
|
|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
|
||||||
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|
||||||
|
|filter|See [Filters](Filters.html)|no|
|
||||||
|dimensions|The list of dimensions to select. If left empty, all dimensions are returned.|no|
|
|dimensions|The list of dimensions to select. If left empty, all dimensions are returned.|no|
|
||||||
|metrics|The list of metrics to select. If left empty, all metrics are returned.|no|
|
|metrics|The list of metrics to select. If left empty, all metrics are returned.|no|
|
||||||
|pagingSpec|A JSON object indicating offsets into different scanned segments. Select query results will return a pagingSpec that can be reused for pagination.|yes|
|
|pagingSpec|A JSON object indicating offsets into different scanned segments. Select query results will return a pagingSpec that can be reused for pagination.|yes|
|
||||||
|
|
|
@ -28,7 +28,7 @@ Configuration:
|
||||||
|
|
||||||
-Ddruid.zk.service.host=localhost
|
-Ddruid.zk.service.host=localhost
|
||||||
|
|
||||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.154"]
|
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||||
|
|
||||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||||
-Ddruid.db.connector.user=druid
|
-Ddruid.db.connector.user=druid
|
||||||
|
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
||||||
|
|
||||||
### Download a Tarball
|
### Download a Tarball
|
||||||
|
|
||||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.154-bin.tar.gz). Download this file to a directory of your choosing.
|
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz). Download this file to a directory of your choosing.
|
||||||
|
|
||||||
You can extract the awesomeness within by issuing:
|
You can extract the awesomeness within by issuing:
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
||||||
Not too lost so far right? That's great! If you cd into the directory:
|
Not too lost so far right? That's great! If you cd into the directory:
|
||||||
|
|
||||||
```
|
```
|
||||||
cd druid-services-0.6.154
|
cd druid-services-0.6.156
|
||||||
```
|
```
|
||||||
|
|
||||||
You should see a bunch of files:
|
You should see a bunch of files:
|
||||||
|
|
|
@ -91,7 +91,7 @@ druid.service=overlord
|
||||||
|
|
||||||
druid.zk.service.host=localhost
|
druid.zk.service.host=localhost
|
||||||
|
|
||||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.154"]
|
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||||
|
|
||||||
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||||
druid.db.connector.user=druid
|
druid.db.connector.user=druid
|
||||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
||||||
|
|
||||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||||
|
|
||||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.154-bin.tar.gz)
|
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz)
|
||||||
|
|
||||||
and untar the contents within by issuing:
|
and untar the contents within by issuing:
|
||||||
|
|
||||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
||||||
|
|
||||||
druid.zk.service.host=localhost
|
druid.zk.service.host=localhost
|
||||||
|
|
||||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.154"]
|
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.156"]
|
||||||
|
|
||||||
# Dummy read only AWS account (used to download example data)
|
# Dummy read only AWS account (used to download example data)
|
||||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
||||||
|
|
||||||
druid.zk.service.host=localhost
|
druid.zk.service.host=localhost
|
||||||
|
|
||||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.154","io.druid.extensions:druid-kafka-seven:0.6.154"]
|
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.156","io.druid.extensions:druid-kafka-seven:0.6.156"]
|
||||||
|
|
||||||
# Change this config to db to hand off to the rest of the Druid cluster
|
# Change this config to db to hand off to the rest of the Druid cluster
|
||||||
druid.publish.type=noop
|
druid.publish.type=noop
|
||||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
||||||
|
|
||||||
h3. Download a Tarball
|
h3. Download a Tarball
|
||||||
|
|
||||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.154-bin.tar.gz)
|
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz)
|
||||||
Download this file to a directory of your choosing.
|
Download this file to a directory of your choosing.
|
||||||
You can extract the awesomeness within by issuing:
|
You can extract the awesomeness within by issuing:
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
||||||
Not too lost so far right? That's great! If you cd into the directory:
|
Not too lost so far right? That's great! If you cd into the directory:
|
||||||
|
|
||||||
```
|
```
|
||||||
cd druid-services-0.6.154
|
cd druid-services-0.6.156
|
||||||
```
|
```
|
||||||
|
|
||||||
You should see a bunch of files:
|
You should see a bunch of files:
|
||||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
||||||
|
|
||||||
# Download a Tarball
|
# Download a Tarball
|
||||||
|
|
||||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.154-bin.tar.gz).
|
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.156-bin.tar.gz).
|
||||||
Download this bad boy to a directory of your choosing.
|
Download this bad boy to a directory of your choosing.
|
||||||
|
|
||||||
You can extract the awesomeness within by issuing:
|
You can extract the awesomeness within by issuing:
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
4
pom.xml
4
pom.xml
|
@ -23,7 +23,7 @@
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<packaging>pom</packaging>
|
<packaging>pom</packaging>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
<name>druid</name>
|
<name>druid</name>
|
||||||
<description>druid</description>
|
<description>druid</description>
|
||||||
<scm>
|
<scm>
|
||||||
|
@ -39,7 +39,7 @@
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<metamx.java-util.version>0.26.6</metamx.java-util.version>
|
<metamx.java-util.version>0.26.7</metamx.java-util.version>
|
||||||
<apache.curator.version>2.6.0</apache.curator.version>
|
<apache.curator.version>2.6.0</apache.curator.version>
|
||||||
<druid.api.version>0.2.10</druid.api.version>
|
<druid.api.version>0.2.10</druid.api.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -673,6 +673,10 @@ public class IncrementalIndex implements Iterable<Row>
|
||||||
falseIdsReverse = biMap.inverse();
|
falseIdsReverse = biMap.inverse();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
|
||||||
|
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
|
||||||
|
*/
|
||||||
public String get(String value)
|
public String get(String value)
|
||||||
{
|
{
|
||||||
return value == null ? null : poorMansInterning.get(value);
|
return value == null ? null : poorMansInterning.get(value);
|
||||||
|
|
|
@ -417,7 +417,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
||||||
@Override
|
@Override
|
||||||
public Object get()
|
public Object get()
|
||||||
{
|
{
|
||||||
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
|
final String[][] dims = currEntry.getKey().getDims();
|
||||||
|
if(dimensionIndex >= dims.length) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final String[] dimVals = dims[dimensionIndex];
|
||||||
if (dimVals.length == 1) {
|
if (dimVals.length == 1) {
|
||||||
return dimVals[0];
|
return dimVals[0];
|
||||||
}
|
}
|
||||||
|
@ -525,6 +529,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String dimVal : dims[dimIndex]) {
|
for (String dimVal : dims[dimIndex]) {
|
||||||
|
/**
|
||||||
|
* using == here instead of .equals() to speed up lookups made possible by
|
||||||
|
* {@link io.druid.segment.incremental.IncrementalIndex.DimDim#poorMansInterning}
|
||||||
|
*/
|
||||||
if (id == dimVal) {
|
if (id == dimVal) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.query.Result;
|
import io.druid.query.Result;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
import io.druid.query.filter.DimFilters;
|
import io.druid.query.filter.DimFilters;
|
||||||
import io.druid.query.groupby.GroupByQuery;
|
import io.druid.query.groupby.GroupByQuery;
|
||||||
|
@ -52,7 +53,7 @@ import org.junit.Test;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.Arrays;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -80,26 +81,7 @@ public class IncrementalIndexStorageAdapterTest
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
GroupByQueryEngine engine = new GroupByQueryEngine(
|
GroupByQueryEngine engine = makeGroupByQueryEngine();
|
||||||
Suppliers.<GroupByQueryConfig>ofInstance(new GroupByQueryConfig()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public int getMaxIntermediateRows()
|
|
||||||
{
|
|
||||||
return 5;
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
new StupidPool(
|
|
||||||
new Supplier<ByteBuffer>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public ByteBuffer get()
|
|
||||||
{
|
|
||||||
return ByteBuffer.allocate(50000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
final Sequence<Row> rows = engine.process(
|
final Sequence<Row> rows = engine.process(
|
||||||
GroupByQuery.builder()
|
GroupByQuery.builder()
|
||||||
|
@ -124,6 +106,93 @@ public class IncrementalIndexStorageAdapterTest
|
||||||
Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent());
|
Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception
|
||||||
|
{
|
||||||
|
IncrementalIndex index = new IncrementalIndex(
|
||||||
|
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
|
||||||
|
);
|
||||||
|
|
||||||
|
index.add(
|
||||||
|
new MapBasedInputRow(
|
||||||
|
new DateTime("2014-09-01T00:00:00"),
|
||||||
|
Lists.newArrayList("billy"),
|
||||||
|
ImmutableMap.<String, Object>of("billy", "hi")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
index.add(
|
||||||
|
new MapBasedInputRow(
|
||||||
|
new DateTime("2014-09-01T01:00:00"),
|
||||||
|
Lists.newArrayList("billy", "sally"),
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"billy", "hip",
|
||||||
|
"sally", "hop"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
GroupByQueryEngine engine = makeGroupByQueryEngine();
|
||||||
|
|
||||||
|
final Sequence<Row> rows = engine.process(
|
||||||
|
GroupByQuery.builder()
|
||||||
|
.setDataSource("test")
|
||||||
|
.setGranularity(QueryGranularity.ALL)
|
||||||
|
.setInterval(new Interval(0, new DateTime().getMillis()))
|
||||||
|
.addDimension("billy")
|
||||||
|
.addDimension("sally")
|
||||||
|
.addAggregator(
|
||||||
|
new LongSumAggregatorFactory("cnt", "cnt")
|
||||||
|
)
|
||||||
|
.addAggregator(
|
||||||
|
new JavaScriptAggregatorFactory(
|
||||||
|
"fieldLength",
|
||||||
|
Arrays.asList("sally", "billy"),
|
||||||
|
"function(current, s, b) { return current + (s == null ? 0 : s.length) + (b == null ? 0 : b.length); }",
|
||||||
|
"function() { return 0; }",
|
||||||
|
"function(a,b) { return a + b; }"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.build(),
|
||||||
|
new IncrementalIndexStorageAdapter(index)
|
||||||
|
);
|
||||||
|
|
||||||
|
final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());
|
||||||
|
|
||||||
|
Assert.assertEquals(2, results.size());
|
||||||
|
|
||||||
|
MapBasedRow row = (MapBasedRow) results.get(0);
|
||||||
|
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l, "fieldLength", 2.0), row.getEvent());
|
||||||
|
|
||||||
|
row = (MapBasedRow) results.get(1);
|
||||||
|
Assert.assertEquals(ImmutableMap.of("billy", "hip", "sally", "hop", "cnt", 1l, "fieldLength", 6.0), row.getEvent());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GroupByQueryEngine makeGroupByQueryEngine()
|
||||||
|
{
|
||||||
|
return new GroupByQueryEngine(
|
||||||
|
Suppliers.<GroupByQueryConfig>ofInstance(
|
||||||
|
new GroupByQueryConfig()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public int getMaxIntermediateRows()
|
||||||
|
{
|
||||||
|
return 5;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
),
|
||||||
|
new StupidPool(
|
||||||
|
new Supplier<ByteBuffer>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ByteBuffer get()
|
||||||
|
{
|
||||||
|
return ByteBuffer.allocate(50000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResetSanity() {
|
public void testResetSanity() {
|
||||||
IncrementalIndex index = new IncrementalIndex(
|
IncrementalIndex index = new IncrementalIndex(
|
||||||
|
@ -252,26 +321,7 @@ public class IncrementalIndexStorageAdapterTest
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
GroupByQueryEngine engine = new GroupByQueryEngine(
|
GroupByQueryEngine engine = makeGroupByQueryEngine();
|
||||||
Suppliers.<GroupByQueryConfig>ofInstance(new GroupByQueryConfig()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public int getMaxIntermediateRows()
|
|
||||||
{
|
|
||||||
return 5;
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
new StupidPool<ByteBuffer>(
|
|
||||||
new Supplier<ByteBuffer>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public ByteBuffer get()
|
|
||||||
{
|
|
||||||
return ByteBuffer.allocate(50000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
final Sequence<Row> rows = engine.process(
|
final Sequence<Row> rows = engine.process(
|
||||||
GroupByQuery.builder()
|
GroupByQuery.builder()
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
|
@ -40,6 +40,12 @@ public class SegmentLoaderConfig
|
||||||
@JsonProperty("dropSegmentDelayMillis")
|
@JsonProperty("dropSegmentDelayMillis")
|
||||||
private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
|
private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
|
||||||
|
|
||||||
|
@JsonProperty("announceIntervalMillis")
|
||||||
|
private int announceIntervalMillis = 0; // do not background announce
|
||||||
|
|
||||||
|
@JsonProperty("numLoadingThreads")
|
||||||
|
private int numLoadingThreads = 1;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private File infoDir = null;
|
private File infoDir = null;
|
||||||
|
|
||||||
|
@ -58,6 +64,16 @@ public class SegmentLoaderConfig
|
||||||
return dropSegmentDelayMillis;
|
return dropSegmentDelayMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getAnnounceIntervalMillis()
|
||||||
|
{
|
||||||
|
return announceIntervalMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNumLoadingThreads()
|
||||||
|
{
|
||||||
|
return numLoadingThreads;
|
||||||
|
}
|
||||||
|
|
||||||
public File getInfoDir()
|
public File getInfoDir()
|
||||||
{
|
{
|
||||||
if (infoDir == null) {
|
if (infoDir == null) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
|
||||||
import io.druid.client.ServerView;
|
import io.druid.client.ServerView;
|
||||||
import io.druid.concurrent.Execs;
|
import io.druid.concurrent.Execs;
|
||||||
import io.druid.db.MetadataSegmentManager;
|
import io.druid.db.MetadataSegmentManager;
|
||||||
|
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||||
import io.druid.segment.realtime.DbSegmentPublisher;
|
import io.druid.segment.realtime.DbSegmentPublisher;
|
||||||
import io.druid.server.coordination.BaseZkCoordinator;
|
import io.druid.server.coordination.BaseZkCoordinator;
|
||||||
import io.druid.server.coordination.DataSegmentChangeCallback;
|
import io.druid.server.coordination.DataSegmentChangeCallback;
|
||||||
|
@ -53,6 +54,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
|
||||||
public BridgeZkCoordinator(
|
public BridgeZkCoordinator(
|
||||||
ObjectMapper jsonMapper,
|
ObjectMapper jsonMapper,
|
||||||
ZkPathsConfig zkPaths,
|
ZkPathsConfig zkPaths,
|
||||||
|
SegmentLoaderConfig config,
|
||||||
DruidServerMetadata me,
|
DruidServerMetadata me,
|
||||||
@Bridge CuratorFramework curator,
|
@Bridge CuratorFramework curator,
|
||||||
DbSegmentPublisher dbSegmentPublisher,
|
DbSegmentPublisher dbSegmentPublisher,
|
||||||
|
@ -60,7 +62,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
|
||||||
ServerView serverView
|
ServerView serverView
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(jsonMapper, zkPaths, me, curator);
|
super(jsonMapper, zkPaths, config, me, curator);
|
||||||
|
|
||||||
this.dbSegmentPublisher = dbSegmentPublisher;
|
this.dbSegmentPublisher = dbSegmentPublisher;
|
||||||
this.metadataSegmentManager = metadataSegmentManager;
|
this.metadataSegmentManager = metadataSegmentManager;
|
||||||
|
|
|
@ -21,10 +21,13 @@ package io.druid.server.coordination;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import com.metamx.common.lifecycle.LifecycleStart;
|
import com.metamx.common.lifecycle.LifecycleStart;
|
||||||
import com.metamx.common.lifecycle.LifecycleStop;
|
import com.metamx.common.lifecycle.LifecycleStop;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||||
import io.druid.server.initialization.ZkPathsConfig;
|
import io.druid.server.initialization.ZkPathsConfig;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData;
|
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||||
|
@ -34,6 +37,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
|
||||||
import org.apache.curator.utils.ZKPaths;
|
import org.apache.curator.utils.ZKPaths;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -45,23 +50,33 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
|
||||||
|
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
private final ZkPathsConfig zkPaths;
|
private final ZkPathsConfig zkPaths;
|
||||||
|
private final SegmentLoaderConfig config;
|
||||||
private final DruidServerMetadata me;
|
private final DruidServerMetadata me;
|
||||||
private final CuratorFramework curator;
|
private final CuratorFramework curator;
|
||||||
|
|
||||||
private volatile PathChildrenCache loadQueueCache;
|
private volatile PathChildrenCache loadQueueCache;
|
||||||
private volatile boolean started;
|
private volatile boolean started;
|
||||||
|
private final ListeningExecutorService loadingExec;
|
||||||
|
|
||||||
public BaseZkCoordinator(
|
public BaseZkCoordinator(
|
||||||
ObjectMapper jsonMapper,
|
ObjectMapper jsonMapper,
|
||||||
ZkPathsConfig zkPaths,
|
ZkPathsConfig zkPaths,
|
||||||
|
SegmentLoaderConfig config,
|
||||||
DruidServerMetadata me,
|
DruidServerMetadata me,
|
||||||
CuratorFramework curator
|
CuratorFramework curator
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
this.zkPaths = zkPaths;
|
this.zkPaths = zkPaths;
|
||||||
|
this.config = config;
|
||||||
this.me = me;
|
this.me = me;
|
||||||
this.curator = curator;
|
this.curator = curator;
|
||||||
|
this.loadingExec = MoreExecutors.listeningDecorator(
|
||||||
|
Executors.newFixedThreadPool(
|
||||||
|
config.getNumLoadingThreads(),
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@LifecycleStart
|
@LifecycleStart
|
||||||
|
@ -83,7 +98,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
|
||||||
loadQueueLocation,
|
loadQueueLocation,
|
||||||
true,
|
true,
|
||||||
true,
|
true,
|
||||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
|
loadingExec
|
||||||
);
|
);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -200,4 +215,9 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
|
||||||
public abstract void loadLocalCache();
|
public abstract void loadLocalCache();
|
||||||
|
|
||||||
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
|
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
|
||||||
|
|
||||||
|
public ListeningExecutorService getLoadingExecutor()
|
||||||
|
{
|
||||||
|
return loadingExec;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,11 @@ package io.druid.server.coordination;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Queues;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||||
|
@ -33,7 +37,11 @@ import org.apache.curator.framework.CuratorFramework;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -60,7 +68,7 @@ public class ZkCoordinator extends BaseZkCoordinator
|
||||||
ScheduledExecutorFactory factory
|
ScheduledExecutorFactory factory
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(jsonMapper, zkPaths, me, curator);
|
super(jsonMapper, zkPaths, config, me, curator);
|
||||||
|
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
@ -121,12 +129,8 @@ public class ZkCoordinator extends BaseZkCoordinator
|
||||||
return ZkCoordinator.this;
|
return ZkCoordinator.this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
|
||||||
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
|
|
||||||
{
|
{
|
||||||
try {
|
|
||||||
log.info("Loading segment %s", segment.getIdentifier());
|
|
||||||
|
|
||||||
final boolean loaded;
|
final boolean loaded;
|
||||||
try {
|
try {
|
||||||
loaded = serverManager.loadSegment(segment);
|
loaded = serverManager.loadSegment(segment);
|
||||||
|
@ -149,14 +153,23 @@ public class ZkCoordinator extends BaseZkCoordinator
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return loaded;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
log.info("Loading segment %s", segment.getIdentifier());
|
||||||
|
if(loadSegment(segment, callback)) {
|
||||||
try {
|
try {
|
||||||
announcer.announceSegment(segment);
|
announcer.announceSegment(segment);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
|
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
catch (SegmentLoadingException e) {
|
catch (SegmentLoadingException e) {
|
||||||
log.makeAlert(e, "Failed to load segment for dataSource")
|
log.makeAlert(e, "Failed to load segment for dataSource")
|
||||||
|
@ -168,60 +181,64 @@ public class ZkCoordinator extends BaseZkCoordinator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addSegments(Iterable<DataSegment> segments, DataSegmentChangeCallback callback)
|
public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback)
|
||||||
|
{
|
||||||
|
try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
|
||||||
|
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
|
||||||
|
backgroundSegmentAnnouncer.startAnnouncing();
|
||||||
|
|
||||||
|
final List<ListenableFuture> segmentLoading = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (final DataSegment segment : segments) {
|
||||||
|
segmentLoading.add(
|
||||||
|
getLoadingExecutor().submit(
|
||||||
|
new Callable<Void>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Void call() throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
final List<String> segmentFailures = Lists.newArrayList();
|
|
||||||
final List<DataSegment> validSegments = Lists.newArrayList();
|
|
||||||
|
|
||||||
for (DataSegment segment : segments) {
|
|
||||||
log.info("Loading segment %s", segment.getIdentifier());
|
log.info("Loading segment %s", segment.getIdentifier());
|
||||||
|
final boolean loaded = loadSegment(segment, callback);
|
||||||
final boolean loaded;
|
|
||||||
try {
|
|
||||||
loaded = serverManager.loadSegment(segment);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
|
|
||||||
removeSegment(segment, callback);
|
|
||||||
segmentFailures.add(segment.getIdentifier());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (loaded) {
|
if (loaded) {
|
||||||
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
|
||||||
if (!segmentInfoCacheFile.exists()) {
|
|
||||||
try {
|
try {
|
||||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
backgroundSegmentAnnouncer.announceSegment(segment);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (InterruptedException e) {
|
||||||
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
|
Thread.currentThread().interrupt();
|
||||||
removeSegment(segment, callback);
|
throw new SegmentLoadingException(e, "Loading Interrupted");
|
||||||
segmentFailures.add(segment.getIdentifier());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
validSegments.add(segment);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
|
} catch(SegmentLoadingException e) {
|
||||||
|
log.error(e, "[%s] failed to load", segment.getIdentifier());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
int failed = 0;
|
||||||
|
for(ListenableFuture future : segmentLoading) {
|
||||||
try {
|
try {
|
||||||
announcer.announceSegments(validSegments);
|
future.get();
|
||||||
|
} catch(InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new SegmentLoadingException(e, "Loading Interrupted");
|
||||||
|
} catch(ExecutionException e) {
|
||||||
|
failed++;
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
}
|
||||||
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments);
|
if(failed > 0) {
|
||||||
|
throw new SegmentLoadingException("%,d errors seen while loading segments", failed);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!segmentFailures.isEmpty()) {
|
backgroundSegmentAnnouncer.finishAnnouncing();
|
||||||
for (String segmentFailure : segmentFailures) {
|
|
||||||
log.error("%s failed to load", segmentFailure);
|
|
||||||
}
|
|
||||||
throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (SegmentLoadingException e) {
|
catch (SegmentLoadingException e) {
|
||||||
log.makeAlert(e, "Failed to load segments for dataSource")
|
log.makeAlert(e, "Failed to load segments")
|
||||||
.addData("segments", segments)
|
.addData("segments", segments)
|
||||||
.emit();
|
.emit();
|
||||||
}
|
}
|
||||||
|
@ -272,4 +289,134 @@ public class ZkCoordinator extends BaseZkCoordinator
|
||||||
callback.execute();
|
callback.execute();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class BackgroundSegmentAnnouncer implements AutoCloseable {
|
||||||
|
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
|
||||||
|
|
||||||
|
private final int intervalMillis;
|
||||||
|
private final DataSegmentAnnouncer announcer;
|
||||||
|
private final ScheduledExecutorService exec;
|
||||||
|
private final LinkedBlockingQueue<DataSegment> queue;
|
||||||
|
private final SettableFuture<Boolean> doneAnnouncing;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
private volatile boolean finished = false;
|
||||||
|
private volatile ScheduledFuture startedAnnouncing = null;
|
||||||
|
private volatile ScheduledFuture nextAnnoucement = null;
|
||||||
|
|
||||||
|
public BackgroundSegmentAnnouncer(
|
||||||
|
DataSegmentAnnouncer announcer,
|
||||||
|
ScheduledExecutorService exec,
|
||||||
|
int intervalMillis
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.announcer = announcer;
|
||||||
|
this.exec = exec;
|
||||||
|
this.intervalMillis = intervalMillis;
|
||||||
|
this.queue = Queues.newLinkedBlockingQueue();
|
||||||
|
this.doneAnnouncing = SettableFuture.create();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void announceSegment(final DataSegment segment) throws InterruptedException
|
||||||
|
{
|
||||||
|
if (finished) {
|
||||||
|
throw new ISE("Announce segment called after finishAnnouncing");
|
||||||
|
}
|
||||||
|
queue.put(segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void startAnnouncing()
|
||||||
|
{
|
||||||
|
if (intervalMillis <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Starting background segment announcing task");
|
||||||
|
|
||||||
|
// schedule background announcing task
|
||||||
|
nextAnnoucement = startedAnnouncing = exec.schedule(
|
||||||
|
new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
try {
|
||||||
|
if (!(finished && queue.isEmpty())) {
|
||||||
|
final List<DataSegment> segments = Lists.newLinkedList();
|
||||||
|
queue.drainTo(segments);
|
||||||
|
try {
|
||||||
|
announcer.announceSegments(segments);
|
||||||
|
nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
doneAnnouncing.setException(
|
||||||
|
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
doneAnnouncing.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
doneAnnouncing.setException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
intervalMillis,
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void finishAnnouncing() throws SegmentLoadingException
|
||||||
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
finished = true;
|
||||||
|
// announce any remaining segments
|
||||||
|
try {
|
||||||
|
final List<DataSegment> segments = Lists.newLinkedList();
|
||||||
|
queue.drainTo(segments);
|
||||||
|
announcer.announceSegments(segments);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
// get any exception that may have been thrown in background annoucing
|
||||||
|
try {
|
||||||
|
// check in case intervalMillis is <= 0
|
||||||
|
if (startedAnnouncing != null) {
|
||||||
|
startedAnnouncing.cancel(false);
|
||||||
|
}
|
||||||
|
// - if the task is waiting on the lock, then the queue will be empty by the time it runs
|
||||||
|
// - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
|
||||||
|
if (doneAnnouncing.isDone()) {
|
||||||
|
doneAnnouncing.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new SegmentLoadingException(e, "Loading Interrupted");
|
||||||
|
}
|
||||||
|
catch (ExecutionException e) {
|
||||||
|
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.info("Completed background segment announcing");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// stop background scheduling
|
||||||
|
synchronized (lock) {
|
||||||
|
finished = true;
|
||||||
|
if (nextAnnoucement != null) {
|
||||||
|
nextAnnoucement.cancel(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||||
import io.druid.curator.announcement.Announcer;
|
import io.druid.curator.announcement.Announcer;
|
||||||
import io.druid.db.MetadataSegmentManager;
|
import io.druid.db.MetadataSegmentManager;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||||
import io.druid.segment.realtime.DbSegmentPublisher;
|
import io.druid.segment.realtime.DbSegmentPublisher;
|
||||||
import io.druid.server.DruidNode;
|
import io.druid.server.DruidNode;
|
||||||
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
|
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
|
||||||
|
@ -156,6 +157,7 @@ public class DruidClusterBridgeTest
|
||||||
BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator(
|
BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator(
|
||||||
jsonMapper,
|
jsonMapper,
|
||||||
zkPathsConfig,
|
zkPathsConfig,
|
||||||
|
new SegmentLoaderConfig(),
|
||||||
metadata,
|
metadata,
|
||||||
remoteCf,
|
remoteCf,
|
||||||
dbSegmentPublisher,
|
dbSegmentPublisher,
|
||||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.server.coordination;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||||
|
@ -50,18 +51,22 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class ZkCoordinatorTest extends CuratorTestBase
|
public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
||||||
|
public static final int COUNT = 50;
|
||||||
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
private ZkCoordinator zkCoordinator;
|
private ZkCoordinator zkCoordinator;
|
||||||
private ServerManager serverManager;
|
private ServerManager serverManager;
|
||||||
private DataSegmentAnnouncer announcer;
|
private DataSegmentAnnouncer announcer;
|
||||||
private File infoDir;
|
private File infoDir;
|
||||||
|
private AtomicInteger announceCount;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
|
@ -101,10 +106,42 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
announcer = new SingleDataSegmentAnnouncer(
|
announceCount = new AtomicInteger(0);
|
||||||
|
announcer = new DataSegmentAnnouncer()
|
||||||
|
{
|
||||||
|
private final DataSegmentAnnouncer delegate = new SingleDataSegmentAnnouncer(
|
||||||
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
|
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void announceSegment(DataSegment segment) throws IOException
|
||||||
|
{
|
||||||
|
announceCount.incrementAndGet();
|
||||||
|
delegate.announceSegment(segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unannounceSegment(DataSegment segment) throws IOException
|
||||||
|
{
|
||||||
|
announceCount.decrementAndGet();
|
||||||
|
delegate.unannounceSegment(segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||||
|
{
|
||||||
|
announceCount.addAndGet(Iterables.size(segments));
|
||||||
|
delegate.announceSegments(segments);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
||||||
|
{
|
||||||
|
announceCount.addAndGet(-Iterables.size(segments));
|
||||||
|
delegate.unannounceSegments(segments);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
zkCoordinator = new ZkCoordinator(
|
zkCoordinator = new ZkCoordinator(
|
||||||
jsonMapper,
|
jsonMapper,
|
||||||
new SegmentLoaderConfig()
|
new SegmentLoaderConfig()
|
||||||
|
@ -114,6 +151,18 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
{
|
{
|
||||||
return infoDir;
|
return infoDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getNumLoadingThreads()
|
||||||
|
{
|
||||||
|
return 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getAnnounceIntervalMillis()
|
||||||
|
{
|
||||||
|
return 50;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
zkPaths,
|
zkPaths,
|
||||||
me,
|
me,
|
||||||
|
@ -133,21 +182,22 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
@Test
|
@Test
|
||||||
public void testLoadCache() throws Exception
|
public void testLoadCache() throws Exception
|
||||||
{
|
{
|
||||||
List<DataSegment> segments = Lists.newArrayList(
|
List<DataSegment> segments = Lists.newLinkedList();
|
||||||
makeSegment("test", "1", new Interval("P1d/2011-04-01")),
|
for(int i = 0; i < COUNT; ++i) {
|
||||||
makeSegment("test", "1", new Interval("P1d/2011-04-02")),
|
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-01")));
|
||||||
makeSegment("test", "2", new Interval("P1d/2011-04-02")),
|
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-02")));
|
||||||
makeSegment("test", "1", new Interval("P1d/2011-04-03")),
|
segments.add(makeSegment("test" + i, "2", new Interval("P1d/2011-04-02")));
|
||||||
makeSegment("test", "1", new Interval("P1d/2011-04-04")),
|
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-03")));
|
||||||
makeSegment("test", "1", new Interval("P1d/2011-04-05")),
|
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-04")));
|
||||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T01")),
|
segments.add(makeSegment("test" + i, "1", new Interval("P1d/2011-04-05")));
|
||||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T02")),
|
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T01")));
|
||||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T03")),
|
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T02")));
|
||||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T05")),
|
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T03")));
|
||||||
makeSegment("test", "2", new Interval("PT1h/2011-04-04T06")),
|
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T05")));
|
||||||
makeSegment("test2", "1", new Interval("P1d/2011-04-01")),
|
segments.add(makeSegment("test" + i, "2", new Interval("PT1h/2011-04-04T06")));
|
||||||
makeSegment("test2", "1", new Interval("P1d/2011-04-02"))
|
segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-01")));
|
||||||
);
|
segments.add(makeSegment("test_two" + i, "1", new Interval("P1d/2011-04-02")));
|
||||||
|
}
|
||||||
Collections.sort(segments);
|
Collections.sort(segments);
|
||||||
|
|
||||||
for (DataSegment segment : segments) {
|
for (DataSegment segment : segments) {
|
||||||
|
@ -158,6 +208,11 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
|
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
|
||||||
zkCoordinator.start();
|
zkCoordinator.start();
|
||||||
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
|
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
|
||||||
|
for(int i = 0; i < COUNT; ++i) {
|
||||||
|
Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue());
|
||||||
|
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
|
||||||
|
}
|
||||||
|
Assert.assertEquals(13 * COUNT, announceCount.get());
|
||||||
zkCoordinator.stop();
|
zkCoordinator.stop();
|
||||||
|
|
||||||
for (DataSegment segment : segments) {
|
for (DataSegment segment : segments) {
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>io.druid</groupId>
|
<groupId>io.druid</groupId>
|
||||||
<artifactId>druid</artifactId>
|
<artifactId>druid</artifactId>
|
||||||
<version>0.6.155-SNAPSHOT</version>
|
<version>0.6.157-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
Loading…
Reference in New Issue