mirror of https://github.com/apache/druid.git
Merge branch 'master' into new-init
Conflicts: examples/config/historical/runtime.properties examples/config/overlord/runtime.properties examples/config/realtime/runtime.properties
This commit is contained in:
commit
a3e86f7094
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -12,18 +12,18 @@ You can provision individual servers, loading Druid onto each machine (or buildi
|
|||
|
||||
[Apache Whirr](http://whirr.apache.org/) is a set of libraries for launching cloud services. For Druid, Whirr serves as an easy way to launch a cluster in Amazon AWS by using simple commands and configuration files (called *recipes*).
|
||||
|
||||
**NOTE:** Whirr will install Druid 0.5.x. At this time Whirr can launch Druid 0.5.x only, but in the near future will support Druid 0.6.x. You can download and install your own copy of Druid 0.5.x [here](http://static.druid.io/artifacts/releases/druid-services-0.5.7-bin.tar.gz).
|
||||
**NOTE:** Whirr will install Druid 0.6.121. Also, it doesn't work with JDK1.7.0_55. JDK1.7.0_45 recommended.
|
||||
|
||||
You'll need an AWS account, and an EC2 key pair from that account so that Whirr can connect to the cloud via the EC2 API. If you haven't generated a key pair, see the [AWS documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) or see this [Whirr FAQ](http://whirr.apache.org/faq.html#how-do-i-find-my-cloud-credentials).
|
||||
You'll need an AWS account, S3 Bucket and an EC2 key pair from that account so that Whirr can connect to the cloud via the EC2 API. If you haven't generated a key pair, see the [AWS documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) or see this [Whirr FAQ](http://whirr.apache.org/faq.html#how-do-i-find-my-cloud-credentials).
|
||||
|
||||
|
||||
### Installing Whirr
|
||||
You must use a version of Whirr that includes and supports a Druid recipe. You can do it so in one of two ways:
|
||||
|
||||
#### Build the Following Version of Whirr
|
||||
Clone the code from [https://github.com/rjurney/whirr/tree/trunk](https://github.com/rjurney/whirr/tree/trunk) and build Whirr:
|
||||
Clone the code from [https://github.com/druid-io/whirr](https://github.com/druid-io/whirr) and build Whirr:
|
||||
|
||||
git clone git@github.com:rjurney/whirr.git
|
||||
git clone git@github.com:druid-io/whirr.git
|
||||
cd whirr
|
||||
git checkout trunk
|
||||
mvn clean install -Dmaven.test.failure.ignore=true
|
||||
|
@ -39,10 +39,12 @@ Then run `mvn install` from the root directory.
|
|||
The Whirr recipe for Druid is the configuration file `$WHIRR_HOME/recipies/druid.properties`. You can edit this file to suit your needs -- it is annotated and self-explanatory. Here are some hints about that file:
|
||||
|
||||
* Set `whirr.location-id` to a specific AWS region (e.g., us-east-1) if desired, else one will be chosen for you.
|
||||
* You can choose the hardware used with `whirr.hardware-id` to a specific instance type (e.g., m1.large). If you don't choose an image via `whirr.image-id` (image must be compatible with hardware), you'll get plain vanilla Linux.
|
||||
* You can choose the hardware used with `whirr.hardware-id` to a specific instance type (e.g., m1.large). By default druid.properties, m3.2xlarge (broker, historical, middle manager), m1.xlarge (coordinator, overlord), and m1.small (zookeeper, mysql) are used.
|
||||
* If you don't choose an image via `whirr.image-id` (image must be compatible with hardware), you'll get plain vanilla Linux. Default druid.properties uses ami-018c9568 (Ubuntu 12.04).
|
||||
* SSH keys (not password protected) must exist for the local user. If they are in the default locations, `${sys:user.home}/.ssh/id_rsa` and `${sys:user.home}/.ssh/id_rsa.pub`, Whirr will find them. Otherwise, you'll have to specify them with `whirr.private-key-file` and `whirr.public-key-file`.
|
||||
* Be sure to specify the absolute path of the Druid realtime spec file `realtime.spec` in `whirr.druid.realtime.spec.path`.
|
||||
* Two Druid cluster templates (see `whirr.instance-templates`) are provided: a small cluster running on a single EC2 instance, and a larger cluster running on multiple instances. The first is a good test case to start with.
|
||||
* Also make sure to specify the correct S3 bucket. Otherwise the cluster won't be able to process tasks.
|
||||
* Two Druid cluster templates (see `whirr.instance-templates`) are provided: a small cluster running on a single EC2 instance, and a larger cluster running on multiple instances.
|
||||
|
||||
The following AWS information must be set in `druid.properties`, as environment variables, or in the file `$WHIRR_HOME/conf/credentials`:
|
||||
|
||||
|
@ -52,6 +54,8 @@ The following AWS information must be set in `druid.properties`, as environment
|
|||
|
||||
How to get the IDENTITY and CREDENTIAL keys is discussed above.
|
||||
|
||||
In order to configure each node, you can edit `services/druid/src/main/resources/functions/start_druid.sh` for JVM configuration and `services/druid/src/main/resources/functions/configure_[NODE_NAME].sh` for specific node configuration. For more information on configuration, read the Druid documentations about it (http://druid.io/docs/0.6.116/Configuration.html).
|
||||
|
||||
### Start a Test Cluster With Whirr
|
||||
Run the following command:
|
||||
|
||||
|
@ -77,4 +81,16 @@ Note that Whirr will return an exception if any of the nodes fail to launch, and
|
|||
% $WHIRR_HOME/bin/whirr destroy-cluster --config $WHIRR_HOME/recipes/druid.properties
|
||||
```
|
||||
|
||||
### Testing the Cluster
|
||||
Now you can run an indexing task and a simple query to see if all the nodes have launched correctly. We are going to use a Wikipedia example again. For a realtime indexing task, run the following command:
|
||||
|
||||
```bash
|
||||
curl -X 'POST' -H 'Content-Type:application/json' -d @#{YOUR_DRUID_DIRECTORY}/examples/indexing/wikipedia_realtime_task.json #{OVERLORD_PUBLIC_IP_ADDR}:#{PORT}/druid/indexer/v1/task
|
||||
```
|
||||
Issuing the request should return a task ID.
|
||||
|
||||
To check the state of the overlord, open up your browser and go to `#{OVERLORD_PUBLIC_IP_ADDR}:#{PORT}/console.html`.
|
||||
|
||||
Next, go to `#{COORDINATOR_PUBLIC_IP_ADDR}:#{PORT}`. Click "View Information about the Cluster"->"Full Cluster View." You should now see the information about servers and segments. If the cluster runs correctly, Segment dimensions and Segment binaryVersion fields should be filled up. Allow few minutes for the segments to be processed.
|
||||
|
||||
Now you should be able to query the data using broker's public IP address.
|
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
|||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.116
|
||||
git checkout druid-0.6.121
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
|
|
@ -5,9 +5,9 @@ layout: doc_page
|
|||
## What should I set my JVM heap?
|
||||
The size of the JVM heap really depends on the type of Druid node you are running. Below are a few considerations.
|
||||
|
||||
[Broker nodes](Broker.html) can use the JVM heap as a query cache and thus, the size of the heap will affect on the number of results that can be cached. Broker nodes do not require off-heap memory and generally, heap sizes can be set to be close to the maximum memory on the machine (leaving some room for JVM overhead). The heap is used to merge results from different real-time and historical nodes, along with other computational processing.
|
||||
[Broker nodes](Broker.html) uses the JVM heap mainly to merge results from historicals and real-times. Brokers also use off-heap memory and processing threads for groupBy queries.
|
||||
|
||||
[Historical nodes](Historical.html) use off-heap memory to store intermediate results, and by default, all segments are memory mapped before they can be queried. The more off-heap memory is available, the more segments can be served without the possibility of data being paged onto disk. On historicals, the JVM heap is used for [GroupBy queries](GroupByQuery.html), some data structures used for intermediate computation, and general processing.
|
||||
[Historical nodes](Historical.html) use off-heap memory to store intermediate results, and by default, all segments are memory mapped before they can be queried. Typically, the more memory is available on a historical node, the more segments can be served without the possibility of data being paged on to disk. On historicals, the JVM heap is used for [GroupBy queries](GroupByQuery.html), some data structures used for intermediate computation, and general processing. One way to calculate how much space there is for segments is: memory_for_segments = total_memory - heap - direct_memory - jvm_overhead.
|
||||
|
||||
[Coordinator nodes](Coordinator nodes) do not require off-heap memory and the heap is used for loading information about all segments to determine what segments need to be loaded, dropped, moved, or replicated.
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/overlord
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -137,7 +137,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/middlemanager
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116","io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -285,7 +285,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/historical
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
|||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/realtime
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116","io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -28,7 +28,7 @@ Configuration:
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.116
|
||||
cd druid-services-0.6.121
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -96,7 +96,7 @@ The configurations for the overlord node are as follows:
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.121"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
|||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.116","io.druid.extensions:druid-kafka-seven:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.116
|
||||
cd druid-services-0.6.121
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
|||
|
||||
# Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.116-bin.tar.gz).
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.121-bin.tar.gz).
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# Extensions
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.116","io.druid.extensions:druid-kafka-seven:0.6.116","io.druid.extensions:druid-rabbitmq:0.6.116", "io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.121","io.druid.extensions:druid-kafka-seven:0.6.121","io.druid.extensions:druid-rabbitmq:0.6.121", "io.druid.extensions:druid-s3-extensions:0.6.121"]
|
||||
|
||||
# Zookeeper
|
||||
druid.zk.service.host=localhost
|
||||
|
@ -16,7 +16,6 @@ druid.storage.storage.storageDirectory=/tmp/druid/localStorage
|
|||
# Indexing service discovery
|
||||
druid.selectors.indexing.serviceName=overlord
|
||||
|
||||
|
||||
# Monitoring (disabled for examples)
|
||||
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ druid.host=localhost
|
|||
druid.service=historical
|
||||
druid.port=8081
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.116"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.121"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
druid.host=localhost
|
||||
druid.port=8083
|
||||
druid.port=8080
|
||||
druid.service=overlord
|
||||
|
||||
druid.indexer.queue.startDelay=PT0M
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -33,8 +33,8 @@ import com.metamx.common.logger.Logger;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.Rows;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
|
@ -48,8 +48,8 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -92,7 +92,11 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
);
|
||||
|
||||
JobHelper.injectSystemProperties(groupByJob);
|
||||
groupByJob.setInputFormatClass(CombineTextInputFormat.class);
|
||||
if (config.isCombineText()) {
|
||||
groupByJob.setInputFormatClass(CombineTextInputFormat.class);
|
||||
} else {
|
||||
groupByJob.setInputFormatClass(TextInputFormat.class);
|
||||
}
|
||||
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
|
||||
groupByJob.setMapOutputKeyClass(LongWritable.class);
|
||||
groupByJob.setMapOutputValueClass(BytesWritable.class);
|
||||
|
|
|
@ -37,7 +37,6 @@ import com.google.inject.Module;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.common.parsers.TimestampParser;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
|
@ -111,7 +110,17 @@ public class HadoopDruidIndexerConfig
|
|||
|
||||
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
|
||||
{
|
||||
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
|
||||
//backwards compatibility
|
||||
if (argSpec.containsKey("schema")) {
|
||||
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
|
||||
} else {
|
||||
return new HadoopDruidIndexerConfig(
|
||||
HadoopDruidIndexerConfig.jsonMapper.convertValue(
|
||||
argSpec,
|
||||
HadoopIngestionSpec.class
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -246,6 +255,11 @@ public class HadoopDruidIndexerConfig
|
|||
return (schema.getIOConfig().getMetadataUpdateSpec() != null);
|
||||
}
|
||||
|
||||
public boolean isCombineText()
|
||||
{
|
||||
return schema.getTuningConfig().isCombineText();
|
||||
}
|
||||
|
||||
public StringInputRowParser getParser()
|
||||
{
|
||||
return (StringInputRowParser) schema.getDataSchema().getParser();
|
||||
|
|
|
@ -73,6 +73,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
|
||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
||||
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
|
||||
final @JsonProperty("combineText") boolean combineText,
|
||||
// These fields are deprecated and will be removed in the future
|
||||
final @JsonProperty("timestampColumn") String timestampColumn,
|
||||
final @JsonProperty("timestampFormat") String timestampFormat,
|
||||
|
@ -138,7 +139,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
this.dataSchema = new DataSchema(
|
||||
dataSource,
|
||||
new StringInputRowParser(
|
||||
dataSpec == null ? null : dataSpec.toParseSpec(timestampSpec, dimensionExclusions),
|
||||
dataSpec == null ? null : dataSpec.toParseSpec(theTimestampSpec, dimensionExclusions),
|
||||
null, null, null, null
|
||||
),
|
||||
rollupSpec == null
|
||||
|
@ -163,7 +164,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
cleanupOnFailure,
|
||||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties
|
||||
jobProperties,
|
||||
combineText
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -212,6 +214,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
null,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -244,6 +247,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
null,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -276,6 +280,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
|
|||
null,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -52,7 +52,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
true,
|
||||
false,
|
||||
false,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -66,6 +67,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
private final boolean overwriteFiles;
|
||||
private final boolean ignoreInvalidRows;
|
||||
private final Map<String, String> jobProperties;
|
||||
private final boolean combineText;
|
||||
|
||||
@JsonCreator
|
||||
public HadoopTuningConfig(
|
||||
|
@ -78,7 +80,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
|
||||
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
||||
final @JsonProperty("jobProperties") Map<String, String> jobProperties
|
||||
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
|
||||
final @JsonProperty("combineText") boolean combineText
|
||||
)
|
||||
{
|
||||
this.workingPath = workingPath == null ? null : workingPath;
|
||||
|
@ -93,6 +96,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
this.jobProperties = (jobProperties == null
|
||||
? ImmutableMap.<String, String>of()
|
||||
: ImmutableMap.copyOf(jobProperties));
|
||||
this.combineText = combineText;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -155,6 +159,12 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
return jobProperties;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isCombineText()
|
||||
{
|
||||
return combineText;
|
||||
}
|
||||
|
||||
public HadoopTuningConfig withWorkingPath(String path)
|
||||
{
|
||||
return new HadoopTuningConfig(
|
||||
|
@ -167,7 +177,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
cleanupOnFailure,
|
||||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties
|
||||
jobProperties,
|
||||
combineText
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -183,7 +194,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
cleanupOnFailure,
|
||||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties
|
||||
jobProperties,
|
||||
combineText
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -199,7 +211,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
cleanupOnFailure,
|
||||
overwriteFiles,
|
||||
ignoreInvalidRows,
|
||||
jobProperties
|
||||
jobProperties,
|
||||
combineText
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.mapreduce.JobContext;
|
|||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -145,7 +146,11 @@ public class IndexGeneratorJob implements Jobby
|
|||
|
||||
JobHelper.injectSystemProperties(job);
|
||||
|
||||
job.setInputFormatClass(CombineTextInputFormat.class);
|
||||
if (config.isCombineText()) {
|
||||
job.setInputFormatClass(CombineTextInputFormat.class);
|
||||
} else {
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
}
|
||||
|
||||
job.setMapperClass(IndexGeneratorMapper.class);
|
||||
job.setMapOutputValueClass(Text.class);
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -422,6 +422,7 @@ public class TaskSerdeTest
|
|||
null,
|
||||
false,
|
||||
ImmutableMap.of("foo", "bar"),
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -29,6 +29,8 @@ import io.druid.query.filter.NoopDimFilter;
|
|||
import io.druid.query.filter.NotDimFilter;
|
||||
import io.druid.query.filter.OrDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.query.metadata.metadata.ColumnIncluderator;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import io.druid.query.search.SearchResultValue;
|
||||
import io.druid.query.search.search.InsensitiveContainsSearchQuerySpec;
|
||||
import io.druid.query.search.search.SearchQuery;
|
||||
|
@ -823,4 +825,112 @@ public class Druids
|
|||
{
|
||||
return new ResultBuilder<TimeBoundaryResultValue>();
|
||||
}
|
||||
|
||||
/**
|
||||
* A Builder for SegmentMetadataQuery.
|
||||
* <p/>
|
||||
* Required: dataSource(), intervals() must be called before build()
|
||||
* <p/>
|
||||
* Usage example:
|
||||
* <pre><code>
|
||||
* SegmentMetadataQuery query = new SegmentMetadataQueryBuilder()
|
||||
* .dataSource("Example")
|
||||
* .interval("2010/2013")
|
||||
* .build();
|
||||
* </code></pre>
|
||||
*
|
||||
* @see io.druid.query.metadata.metadata.SegmentMetadataQuery
|
||||
*/
|
||||
public static class SegmentMetadataQueryBuilder
|
||||
{
|
||||
private DataSource dataSource;
|
||||
private QuerySegmentSpec querySegmentSpec;
|
||||
private ColumnIncluderator toInclude;
|
||||
private Boolean merge;
|
||||
private Map<String, Object> context;
|
||||
|
||||
public SegmentMetadataQueryBuilder()
|
||||
{
|
||||
dataSource = null;
|
||||
querySegmentSpec = null;
|
||||
toInclude = null;
|
||||
merge = null;
|
||||
context = null;
|
||||
}
|
||||
|
||||
public SegmentMetadataQuery build()
|
||||
{
|
||||
return new SegmentMetadataQuery(
|
||||
dataSource,
|
||||
querySegmentSpec,
|
||||
toInclude,
|
||||
merge,
|
||||
context
|
||||
);
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder)
|
||||
{
|
||||
return new SegmentMetadataQueryBuilder()
|
||||
.dataSource(builder.dataSource)
|
||||
.intervals(builder.querySegmentSpec)
|
||||
.toInclude(toInclude)
|
||||
.merge(merge)
|
||||
.context(builder.context);
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder dataSource(String ds)
|
||||
{
|
||||
dataSource = new TableDataSource(ds);
|
||||
return this;
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder dataSource(DataSource ds)
|
||||
{
|
||||
dataSource = ds;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder intervals(QuerySegmentSpec q)
|
||||
{
|
||||
querySegmentSpec = q;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder intervals(String s)
|
||||
{
|
||||
querySegmentSpec = new LegacySegmentSpec(s);
|
||||
return this;
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder intervals(List<Interval> l)
|
||||
{
|
||||
querySegmentSpec = new LegacySegmentSpec(l);
|
||||
return this;
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder toInclude(ColumnIncluderator toInclude)
|
||||
{
|
||||
this.toInclude = toInclude;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public SegmentMetadataQueryBuilder merge(boolean merge)
|
||||
{
|
||||
this.merge = merge;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryBuilder context(Map<String, Object> c)
|
||||
{
|
||||
context = c;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
public static SegmentMetadataQueryBuilder newSegmentMetadataQueryBuilder()
|
||||
{
|
||||
return new SegmentMetadataQueryBuilder();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
|
|||
import com.metamx.common.guava.ExecutorExecutingSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.query.AbstractPrioritizedCallable;
|
||||
import io.druid.query.ConcatQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
|
@ -116,17 +117,14 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
@Override
|
||||
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
|
||||
{
|
||||
|
||||
final int priority = query.getContextPriority(0);
|
||||
Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
|
||||
new Callable<Sequence<SegmentAnalysis>>()
|
||||
new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
|
||||
{
|
||||
@Override
|
||||
public Sequence<SegmentAnalysis> call() throws Exception
|
||||
{
|
||||
return new ExecutorExecutingSequence<SegmentAnalysis>(
|
||||
input.run(query),
|
||||
queryExecutor
|
||||
);
|
||||
return input.run(query);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -21,17 +21,76 @@ package io.druid.query.metadata;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.metadata.metadata.ColumnAnalysis;
|
||||
import io.druid.query.metadata.metadata.ListColumnIncluderator;
|
||||
import io.druid.query.metadata.metadata.SegmentAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.TestIndex;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public class SegmentMetadataQueryTest
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
private final QueryRunner runner = makeQueryRunner(
|
||||
new SegmentMetadataQueryRunnerFactory()
|
||||
);
|
||||
private ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static QueryRunner makeQueryRunner(
|
||||
QueryRunnerFactory factory
|
||||
)
|
||||
{
|
||||
return QueryRunnerTestHelper.makeQueryRunner(
|
||||
factory,
|
||||
new QueryableIndexSegment(QueryRunnerTestHelper.segmentId, TestIndex.getMMappedTestIndex())
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSegmentMetadataQuery()
|
||||
{
|
||||
SegmentMetadataQuery query = Druids.newSegmentMetadataQueryBuilder()
|
||||
.dataSource("testing")
|
||||
.intervals("2013/2014")
|
||||
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
|
||||
.merge(true)
|
||||
.build();
|
||||
|
||||
Iterable<SegmentAnalysis> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
Lists.<SegmentAnalysis>newArrayList()
|
||||
);
|
||||
SegmentAnalysis val = results.iterator().next();
|
||||
Assert.assertEquals("testSegment", val.getId());
|
||||
Assert.assertEquals(69843, val.getSize());
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")),
|
||||
val.getIntervals()
|
||||
);
|
||||
Assert.assertEquals(1, val.getColumns().size());
|
||||
final ColumnAnalysis columnAnalysis = val.getColumns().get("placement");
|
||||
Assert.assertEquals("STRING", columnAnalysis.getType());
|
||||
Assert.assertEquals(10881, columnAnalysis.getSize());
|
||||
Assert.assertEquals(new Integer(1), columnAnalysis.getCardinality());
|
||||
Assert.assertNull(columnAnalysis.getErrorMessage());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -79,7 +80,8 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
)
|
||||
{
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
|
||||
// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
|
||||
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate));
|
||||
|
||||
zNodes.put(inventoryKey, filteredInventory);
|
||||
for (DataSegment segment : filteredInventory) {
|
||||
|
@ -94,7 +96,8 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
)
|
||||
{
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
|
||||
// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
|
||||
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate));
|
||||
|
||||
Set<DataSegment> existing = zNodes.get(inventoryKey);
|
||||
if (existing == null) {
|
||||
|
|
|
@ -77,6 +77,7 @@ import org.joda.time.Duration;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -91,11 +92,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
public class DruidCoordinator
|
||||
{
|
||||
public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR";
|
||||
|
||||
private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final DruidCoordinatorConfig config;
|
||||
private final ZkPathsConfig zkPaths;
|
||||
private final JacksonConfigManager configManager;
|
||||
|
@ -111,7 +109,6 @@ public class DruidCoordinator
|
|||
private final AtomicReference<LeaderLatch> leaderLatch;
|
||||
private final ServiceAnnouncer serviceAnnouncer;
|
||||
private final DruidNode self;
|
||||
|
||||
private volatile boolean started = false;
|
||||
private volatile int leaderCounter = 0;
|
||||
private volatile boolean leader = false;
|
||||
|
@ -234,7 +231,6 @@ public class DruidCoordinator
|
|||
return retVal;
|
||||
}
|
||||
|
||||
|
||||
public CountingMap<String> getSegmentAvailability()
|
||||
{
|
||||
final CountingMap<String> retVal = new CountingMap<>();
|
||||
|
@ -253,43 +249,22 @@ public class DruidCoordinator
|
|||
|
||||
public Map<String, Double> getLoadStatus()
|
||||
{
|
||||
// find available segments
|
||||
Map<String, Set<DataSegment>> availableSegments = Maps.newHashMap();
|
||||
for (DataSegment dataSegment : getAvailableDataSegments()) {
|
||||
Set<DataSegment> segments = availableSegments.get(dataSegment.getDataSource());
|
||||
if (segments == null) {
|
||||
segments = Sets.newHashSet();
|
||||
availableSegments.put(dataSegment.getDataSource(), segments);
|
||||
}
|
||||
segments.add(dataSegment);
|
||||
}
|
||||
|
||||
// find segments currently loaded
|
||||
Map<String, Set<DataSegment>> segmentsInCluster = Maps.newHashMap();
|
||||
for (DruidServer druidServer : serverInventoryView.getInventory()) {
|
||||
for (DruidDataSource druidDataSource : druidServer.getDataSources()) {
|
||||
Set<DataSegment> segments = segmentsInCluster.get(druidDataSource.getName());
|
||||
if (segments == null) {
|
||||
segments = Sets.newHashSet();
|
||||
segmentsInCluster.put(druidDataSource.getName(), segments);
|
||||
}
|
||||
segments.addAll(druidDataSource.getSegments());
|
||||
}
|
||||
}
|
||||
|
||||
// compare available segments with currently loaded
|
||||
Map<String, Double> loadStatus = Maps.newHashMap();
|
||||
for (Map.Entry<String, Set<DataSegment>> entry : availableSegments.entrySet()) {
|
||||
String dataSource = entry.getKey();
|
||||
Set<DataSegment> segmentsAvailable = entry.getValue();
|
||||
Set<DataSegment> loadedSegments = segmentsInCluster.get(dataSource);
|
||||
if (loadedSegments == null) {
|
||||
loadedSegments = Sets.newHashSet();
|
||||
for (DruidDataSource dataSource : databaseSegmentManager.getInventory()) {
|
||||
final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
|
||||
final int availableSegmentSize = segments.size();
|
||||
|
||||
// remove loaded segments
|
||||
for (DruidServer druidServer : serverInventoryView.getInventory()) {
|
||||
final DruidDataSource loadedView = druidServer.getDataSource(dataSource.getName());
|
||||
if (loadedView != null) {
|
||||
segments.removeAll(loadedView.getSegments());
|
||||
}
|
||||
}
|
||||
Set<DataSegment> unloadedSegments = Sets.difference(segmentsAvailable, loadedSegments);
|
||||
final int unloadedSegmentSize = segments.size();
|
||||
loadStatus.put(
|
||||
dataSource,
|
||||
100 * ((double) (segmentsAvailable.size() - unloadedSegments.size()) / (double) segmentsAvailable.size())
|
||||
dataSource.getName(),
|
||||
100 * ((double) (availableSegmentSize - unloadedSegmentSize) / (double) availableSegmentSize)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -422,11 +397,27 @@ public class DruidCoordinator
|
|||
}
|
||||
}
|
||||
|
||||
public Set<DataSegment> getAvailableDataSegments()
|
||||
public Set<DataSegment> getOrderedAvailableDataSegments()
|
||||
{
|
||||
Set<DataSegment> availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
|
||||
|
||||
Iterable<DataSegment> dataSegments = Iterables.concat(
|
||||
Iterable<DataSegment> dataSegments = getAvailableDataSegments();
|
||||
|
||||
for (DataSegment dataSegment : dataSegments) {
|
||||
if (dataSegment.getSize() < 0) {
|
||||
log.makeAlert("No size on Segment, wtf?")
|
||||
.addData("segment", dataSegment)
|
||||
.emit();
|
||||
}
|
||||
availableSegments.add(dataSegment);
|
||||
}
|
||||
|
||||
return availableSegments;
|
||||
}
|
||||
|
||||
public Iterable<DataSegment> getAvailableDataSegments()
|
||||
{
|
||||
return Iterables.concat(
|
||||
Iterables.transform(
|
||||
databaseSegmentManager.getInventory(),
|
||||
new Function<DruidDataSource, Iterable<DataSegment>>()
|
||||
|
@ -439,17 +430,6 @@ public class DruidCoordinator
|
|||
}
|
||||
)
|
||||
);
|
||||
|
||||
for (DataSegment dataSegment : dataSegments) {
|
||||
if (dataSegment.getSize() < 0) {
|
||||
log.makeAlert("No size on Segment, wtf?")
|
||||
.addData("segment", dataSegment)
|
||||
.emit();
|
||||
}
|
||||
availableSegments.add(dataSegment);
|
||||
}
|
||||
|
||||
return availableSegments;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
|
|
@ -41,7 +41,7 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
|
|||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
// Display info about all available segments
|
||||
final Set<DataSegment> availableSegments = coordinator.getAvailableDataSegments();
|
||||
final Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Available DataSegments");
|
||||
for (DataSegment dataSegment : availableSegments) {
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.117-SNAPSHOT</version>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -27,18 +27,18 @@ import io.airlift.command.Command;
|
|||
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import io.druid.indexer.HadoopDruidIndexerJob;
|
||||
import io.druid.indexer.HadoopIngestionSpec;
|
||||
import io.druid.indexer.JobHelper;
|
||||
import io.druid.indexer.Jobby;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
@Command(
|
||||
name = "hadoop-indexer",
|
||||
description = "Runs the batch Hadoop Druid Indexer, see https://github.com/metamx/druid/wiki/Batch-ingestion for a description."
|
||||
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/latest/Batch-ingestion.html for a description."
|
||||
)
|
||||
public class CliInternalHadoopIndexer implements Runnable
|
||||
{
|
||||
|
@ -65,6 +65,7 @@ public class CliInternalHadoopIndexer implements Runnable
|
|||
public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig()
|
||||
{
|
||||
try {
|
||||
HadoopIngestionSpec spec;
|
||||
if (argumentSpec.startsWith("{")) {
|
||||
return HadoopDruidIndexerConfig.fromString(argumentSpec);
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue