diff --git a/build.sh b/build.sh
index 9b6148b6c4c..50bcefbfe7b 100755
--- a/build.sh
+++ b/build.sh
@@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
-echo "See also http://druid.io/docs/0.6.26"
+echo "See also http://druid.io/docs/0.6.46"
diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index c821f9aa69b..c56617fc659 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.27-SNAPSHOT
+ 0.6.48-SNAPSHOT
@@ -53,4 +53,20 @@
test
+
+
+
+
+ maven-jar-plugin
+
+
+
+ true
+ true
+
+
+
+
+
+
diff --git a/common/pom.xml b/common/pom.xml
index 24c37411ba3..9b732f714ee 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.27-SNAPSHOT
+ 0.6.48-SNAPSHOT
@@ -59,14 +59,6 @@
org.skife.configconfig-magic
-
- org.apache.curator
- curator-recipes
-
-
- org.apache.curator
- curator-x-discovery
- org.hibernatehibernate-validator
@@ -75,10 +67,6 @@
javax.validationvalidation-api
-
- it.uniroma3.mat
- extendedset
- com.google.guavaguava
@@ -127,16 +115,6 @@
log4jlog4j
-
- mysql
- mysql-connector-java
- 5.1.18
-
-
- org.mozilla
- rhino
- 1.7R4
-
@@ -168,7 +146,15 @@
-
+
+
+
+ true
+ true
+
+
+
+
diff --git a/docs/content/Booting-a-production-cluster.md b/docs/content/Booting-a-production-cluster.md
index 61755b5733d..3f57ce13d1c 100644
--- a/docs/content/Booting-a-production-cluster.md
+++ b/docs/content/Booting-a-production-cluster.md
@@ -3,7 +3,7 @@ layout: doc_page
---
# Booting a Single Node Cluster #
-[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.26-bin.tar.gz).
+[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.46-bin.tar.gz).
The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables:
diff --git a/docs/content/Cluster-setup.md b/docs/content/Cluster-setup.md
index aa142efc453..e4ba0e564f1 100644
--- a/docs/content/Cluster-setup.md
+++ b/docs/content/Cluster-setup.md
@@ -1,6 +1,9 @@
---
layout: doc_page
---
+
+# Setting Up a Druid Cluster
+
A Druid cluster consists of various node types that need to be set up depending on your use case. See our [Design](Design.html) docs for a description of the different node types.
Minimum Physical Layout: Absolute Minimum
@@ -74,7 +77,7 @@ Local disk ("ephemeral" on AWS EC2) for caching is recommended over network moun
Setup
-----
-Setting up a cluster is essentially just firing up all of the nodes you want with the proper [[configuration]]. One thing to be aware of is that there are a few properties in the configuration that potentially need to be set individually for each process:
+Setting up a cluster is essentially just firing up all of the nodes you want with the proper [configuration](Configuration.html). One thing to be aware of is that there are a few properties in the configuration that potentially need to be set individually for each process:
```
druid.server.type=historical|realtime
diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md
index 51afb07ae65..f01b5d41a0b 100644
--- a/docs/content/Configuration.md
+++ b/docs/content/Configuration.md
@@ -1,25 +1,28 @@
---
layout: doc_page
---
+
+# Configuring Druid
+
This describes the basic server configuration that is loaded by all the server processes; the same file is loaded by all. See also the json "specFile" descriptions in [Realtime](Realtime.html) and [Batch-ingestion](Batch-ingestion.html).
-JVM Configuration Best Practices
-================================
+## JVM Configuration Best Practices
There are three JVM parameters that we set on all of our processes:
-1. `-Duser.timezone=UTC` This sets the default timezone of the JVM to UTC. We always set this and do not test with other default timezones, so local timezones might work, but they also might uncover weird and interesting bugs
-2. `-Dfile.encoding=UTF-8` This is similar to timezone, we test assuming UTF-8. Local encodings might work, but they also might result in weird and interesting bugs
-3. `-Djava.io.tmpdir=` Various parts of the system that interact with the file system do it via temporary files, these files can get somewhat large. Many production systems are setup to have small (but fast) `/tmp` directories, these can be problematic with Druid so we recommend pointing the JVM’s tmp directory to something with a little more meat.
+1. `-Duser.timezone=UTC` This sets the default timezone of the JVM to UTC. We always set this and do not test with other default timezones, so local timezones might work, but they also might uncover weird and interesting bugs.
+2. `-Dfile.encoding=UTF-8` This is similar to timezone, we test assuming UTF-8. Local encodings might work, but they also might result in weird and interesting bugs.
+3. `-Djava.io.tmpdir=` Various parts of the system that interact with the file system do it via temporary files, and these files can get somewhat large. Many production systems are set up to have small (but fast) `/tmp` directories, which can be problematic with Druid so we recommend pointing the JVM’s tmp directory to something with a little more meat.
-Modules
-=======
+## Modules
-As of Druid v0.6, most core Druid functionality has been compartmentalized into modules. There are a set of default modules that may apply to any node type, and there are specific modules for the different node types. Default modules are __lazily instantiated__. Each module has its own set of configuration. This page will describe the configuration of the default modules.
+As of Druid v0.6, most core Druid functionality has been compartmentalized into modules. There are a set of default modules that may apply to any node type, and there are specific modules for the different node types. Default modules are __lazily instantiated__. Each module has its own set of configuration.
+
+This page describes the configuration of the default modules. Node-specific configuration is discussed on each node's respective page. In addition, you can add custom modules to [extend Druid](Modules.html).
Configuration of the various modules is done via Java properties. These can either be provided as `-D` system properties on the java command line or they can be passed in via a file called `runtime.properties` that exists on the classpath.
-Note: as a future item, we’d like to consolidate all of the various configuration into a yaml/JSON based configuration files.
+Note: as a future item, we’d like to consolidate all of the various configuration into a yaml/JSON based configuration file.
### Emitter Module
@@ -147,7 +150,7 @@ Druid storage nodes maintain information about segments they have already downlo
|Property|Description|Default|
|--------|-----------|-------|
-|`druid.segmentCache.locations`|Segments assigned to a historical node are first stored on the local file system and then served by the historical node. These locations defines where that local cache resides|none|
+|`druid.segmentCache.locations`|Segments assigned to a historical node are first stored on the local file system and then served by the historical node. These locations define where that local cache resides|none|
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true|
|`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
@@ -282,8 +285,10 @@ This deep storage is used to interface with Amazon's S3.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.storage.bucket`|S3 bucket name.|none|
-|`druid.storage.basekey`|S3 base key.|none|
+|`druid.storage.baseKey`|S3 object key prefix for storage.|none|
|`druid.storage.disableAcl`|Boolean flag for ACL.|false|
+|`druid.storage.archiveBucket`|S3 bucket name for archiving when running the indexing-service *archive task*.|none|
+|`druid.storage.archiveBaseKey`|S3 object key prefix for archiving.|none|
#### HDFS Deep Storage
@@ -308,21 +313,29 @@ This module is used to configure the [Indexing Service](Indexing-Service.html) t
|Property|Description|Default|
|--------|-----------|-------|
-|`druid.indexer.logs.type`|Choices:noop, S3. Where to store task logs|noop|
+|`druid.indexer.logs.type`|Choices:noop, s3, file. Where to store task logs|file|
-#### Noop Task Logs
+#### File Task Logs
-No task logs are actually stored.
+Store task logs in the local filesystem.
+
+|Property|Description|Default|
+|--------|-----------|-------|
+|`druid.indexer.logs.directory`|Local filesystem path.|log|
#### S3 Task Logs
-Store Task Logs in S3.
+Store task logs in S3.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.s3Bucket`|S3 bucket name.|none|
|`druid.indexer.logs.s3Prefix`|S3 key prefix.|none|
+#### Noop Task Logs
+
+No task logs are actually stored.
+
### Firehose Module
The Firehose module lists all available firehoses. There are no configurations.
diff --git a/docs/content/Examples.md b/docs/content/Examples.md
index 81549a7ec2d..f0771408c75 100644
--- a/docs/content/Examples.md
+++ b/docs/content/Examples.md
@@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
-git checkout druid-0.6.26
+git checkout druid-0.6.46
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
-[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.26-bin.tar.gz) a stand-alone tarball and run it:
+[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz
diff --git a/docs/content/Indexing-Service.md b/docs/content/Indexing-Service.md
index aed200a7c8f..2f15b200025 100644
--- a/docs/content/Indexing-Service.md
+++ b/docs/content/Indexing-Service.md
@@ -56,6 +56,7 @@ With the following JVM configuration:
-Ddruid.db.connector.password=diurd
-Ddruid.selectors.indexing.serviceName=overlord
+-Ddruid.indexer.queue.startDelay=PT0M
-Ddruid.indexer.runner.javaOpts="-server -Xmx1g"
-Ddruid.indexer.runner.startPort=8081
-Ddruid.indexer.fork.property.druid.computation.buffer.size=268435456
@@ -110,12 +111,17 @@ If autoscaling is enabled, new middle managers may be added when a task has been
#### JVM Configuration
-In addition to the configuration of some of the default modules in [Configuration](Configuration.html), the overlord module requires the following basic configs to run in remote mode:
+In addition to the configuration of some of the default modules in [Configuration](Configuration.html), the overlord has the following basic configs:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment.|local|
-|`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be bootstrapped if the overlord should fail.|local|
+|`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be resumed if the overlord should fail.|local|
+|`druid.indexer.storage.recentlyFinishedThreshold`|A duration of time to store task results.|PT24H|
+|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE|
+|`druid.indexer.queue.startDelay`|Sleep this long before starting overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M|
+|`druid.indexer.queue.restartDelay`|Sleep this long when overlord queue management throws an exception before trying again.|PT30S|
+|`druid.indexer.queue.storageSyncRate`|Sync overlord state this often with an underlying task persistence mechanism.|PT1M|
The following configs only apply if the overlord is running in remote mode:
diff --git a/docs/content/Modules.md b/docs/content/Modules.md
index 17b8e538785..b5b8a693053 100644
--- a/docs/content/Modules.md
+++ b/docs/content/Modules.md
@@ -1,6 +1,9 @@
---
layout: doc_page
---
+
+# Extending Druid With Custom Modules
+
Druid version 0.6 introduces a new module system that allows for the addition of extensions at runtime.
## Specifying extensions
@@ -164,4 +167,4 @@ Adding new Jersey resources to a module requires calling the following code to b
```java
Jerseys.addResource(binder, NewResource.class);
-```
\ No newline at end of file
+```
diff --git a/docs/content/Realtime.md b/docs/content/Realtime.md
index 3d6c432add1..a6111a8734b 100644
--- a/docs/content/Realtime.md
+++ b/docs/content/Realtime.md
@@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.26"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.46"]
druid.zk.service.host=localhost
diff --git a/docs/content/TopNMetricSpec.md b/docs/content/TopNMetricSpec.md
new file mode 100644
index 00000000000..672f5d352f6
--- /dev/null
+++ b/docs/content/TopNMetricSpec.md
@@ -0,0 +1,45 @@
+---
+layout: doc_page
+---
+TopNMetricSpec
+==================
+
+The topN metric spec specifies how topN values should be sorted.
+
+## Numeric TopNMetricSpec
+
+The simplest metric specification is a String value indicating the metric to sort topN results by. They are included in a topN query with:
+
+```json
+"metric":
+```
+
+The metric field can also be given as a JSON object. The grammar for dimension values sorted by numeric value is shown below:
+
+```json
+"metric": {
+ "type": "numeric",
+ "metric": ""
+}
+```
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|this indicates a numeric sort|yes|
+|metric|the actual metric field in which results will be sorted by|yes|
+
+## Lexicographic TopNMetricSpec
+
+The grammar for dimension values sorted lexicographically is as follows:
+
+```json
+"metric": {
+ "type": "lexicographic",
+ "previousStop": ""
+}
+```
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|this indicates a lexicographic sort|yes|
+|previousStop|the starting point of the lexicographic sort. For example, if a previousStop value is 'b', all values before 'b' are discarded. This field can be used to paginate through all the dimension values.|no|
diff --git a/docs/content/TopNQuery.md b/docs/content/TopNQuery.md
new file mode 100644
index 00000000000..d418799d29c
--- /dev/null
+++ b/docs/content/TopNQuery.md
@@ -0,0 +1,119 @@
+---
+layout: doc_page
+---
+TopN queries
+==================
+
+TopN queries return a sorted set of results for the values in a given dimension according to some criteria. Conceptually, they can be thought of as an approximate [GroupByQuery](GroupByQuery.html) over a single dimension with an [Ordering](Ordering.html) spec. TopNs are much faster and resource efficient than GroupBys for this use case. These types of queries take a topN query object and return an array of JSON objects where each object represents a value asked for by the topN query.
+
+A topN query object looks like:
+
+```json
+ "queryType": "topN",
+ "dataSource": "sample_data",
+ "dimension": "sample_dim",
+ "threshold": 5,
+ "metric": "count",
+ "granularity": "all",
+ "filter": {
+ "type": "and",
+ "fields": [
+ {
+ "type": "selector",
+ "dimension": "dim1",
+ "value": "some_value"
+ },
+ {
+ "type": "selector",
+ "dimension": "dim2",
+ "value": "some_other_val"
+ }
+ ]
+ },
+ "aggregations": [
+ {
+ "type": "longSum",
+ "name": "count",
+ "fieldName": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "some_metric",
+ "fieldName": "some_metric"
+ }
+ ],
+ "postAggregations": [
+ {
+ "type": "arithmetic",
+ "name": "sample_divide",
+ "fn": "/",
+ "fields": [
+ {
+ "type": "fieldAccess",
+ "name": "some_metric",
+ "fieldName": "some_metric"
+ },
+ {
+ "type": "fieldAccess",
+ "name": "count",
+ "fieldName": "count"
+ }
+ ]
+ }
+ ],
+ "intervals": [
+ "2013-08-31T00:00:00.000/2013-09-03T00:00:00.000"
+ ]
+}
+```
+
+There are 10 parts to a topN query, but 7 of them are shared with [TimeseriesQuery](TimeseriesQuery.html). Please review [TimeseriesQuery](TimeseriesQuery.html) for meanings of fields not defined below.
+
+|property|description|required?|
+|--------|-----------|---------|
+|dimension|A JSON object defining the dimension that you want the top taken for. For more info, see [DimensionSpecs](DimensionSpecs.html)|yes|
+|threshold|An integer defining the N in the topN (i.e. how many you want in the top list)|yes|
+|metric|A JSON object specifying the metric to sort by for the top list. For more info, see [TopNMetricSpec](TopNMetricSpec.html).|yes|
+
+Please note the context JSON object is also available for topN queries and should be used with the same caution as the timeseries case.
+The format of the results would look like so:
+
+```json
+[
+ {
+ "timestamp": "2013-08-31T00:00:00.000Z",
+ "result": [
+ {
+ "dim1": "dim1_val",
+ "count": 111,
+ "some_metrics": 10669,
+ "average": 96.11711711711712
+ },
+ {
+ "dim1": "another_dim1_val",
+ "count": 88,
+ "some_metrics": 28344,
+ "average": 322.09090909090907
+ },
+ {
+ "dim1": "dim1_val3",
+ "count": 70,
+ "some_metrics": 871,
+ "average": 12.442857142857143
+ },
+ {
+ "dim1": "dim1_val4",
+ "count": 62,
+ "some_metrics": 815,
+ "average": 13.14516129032258
+ },
+ {
+ "dim1": "dim1_val5",
+ "count": 60,
+ "some_metrics": 2787,
+ "average": 46.45
+ }
+ ]
+ }
+]
+```
diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md
index 548d573c6f1..1f6f43ac97e 100644
--- a/docs/content/Tutorial:-A-First-Look-at-Druid.md
+++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md
@@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.26-bin.tar.gz). Download this file to a directory of your choosing.
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.26
+cd druid-services-0.6.46
```
You should see a bunch of files:
@@ -205,7 +205,7 @@ You are probably wondering, what are these [Granularities](Granularities.html) a
To issue the query and get some results, run the following in your command line:
```
-curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d ````timeseries_query.body
+curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @timeseries_query.body
```
Once again, you should get a JSON blob of text back with your results, that looks something like this:
diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
index 100df976de4..1f0992bb6bd 100644
--- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
+++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
@@ -94,6 +94,7 @@ druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.selectors.indexing.serviceName=overlord
+druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx1g"
druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.computation.buffer.size=268435456
@@ -246,6 +247,23 @@ Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) should yield:
} ]
```
+Console
+--------
+
+The indexing service overlord has a console located at:
+
+```bash
+localhost:8087/console.html
+```
+
+On this console, you can look at statuses and logs of recently submitted and completed tasks.
+
+If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can use the console to read the individual task logs.
+
+Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html). More information about how to configure this is [here](Configuration.html).
+
+Most common data ingestion problems are around timestamp formats and other malformed data issues.
+
Next Steps
----------
diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
index 60d5487784d..cffb288f5b3 100644
--- a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
+++ b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md
@@ -44,7 +44,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
#### Setting up Kafka
-[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.26/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
+[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.46/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).
diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md
index e954c3de257..b2f5b6975ec 100644
--- a/docs/content/Tutorial:-The-Druid-Cluster.md
+++ b/docs/content/Tutorial:-The-Druid-Cluster.md
@@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
-You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.26-bin.tar.gz)
+You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
and untar the contents within by issuing:
@@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.26"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@@ -238,7 +238,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.26","io.druid.extensions:druid-kafka-seven:0.6.26"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
@@ -253,5 +253,5 @@ druid.processing.buffer.sizeBytes=10000000
Next Steps
----------
-If you are intested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data?
+If you are interested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data?
Check out the next [tutorial](Tutorial%3A-Loading-Your-Data-Part-1.html) section for more info!
diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md
index c8b83d1e00c..cdbbc5f7ee7 100644
--- a/docs/content/Tutorial:-Webstream.md
+++ b/docs/content/Tutorial:-Webstream.md
@@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.26-bin.tar.gz)
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.26
+cd druid-services-0.6.46
```
You should see a bunch of files:
diff --git a/docs/content/Twitter-Tutorial.textile b/docs/content/Twitter-Tutorial.textile
index 9e368b13f10..68780006068 100644
--- a/docs/content/Twitter-Tutorial.textile
+++ b/docs/content/Twitter-Tutorial.textile
@@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.26-bin.tar.gz.
+We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:
diff --git a/docs/content/toc.textile b/docs/content/toc.textile
index 84fde036997..2a5f7ed185d 100644
--- a/docs/content/toc.textile
+++ b/docs/content/toc.textile
@@ -3,7 +3,7 @@
-h1. Introduction
+h2. Introduction
* "About Druid":./
* "Concepts and Terminology":./Concepts-and-Terminology.html
@@ -14,16 +14,12 @@ h2. Getting Started
* "Tutorial: Loading Your Data Part 2":./Tutorial:-Loading-Your-Data-Part-2.html
* "Tutorial: All About Queries":./Tutorial:-All-About-Queries.html
-h2. Evaluate Druid
+h2. Operations
+* "Configuration":Configuration.html
+* "Extending Druid":./Modules.html
* "Cluster Setup":./Cluster-setup.html
* "Booting a Production Cluster":./Booting-a-production-cluster.html
-h2. Configuration
-* "Configuration":Configuration.html
-
-h2. Extend Druid
-* "Modules":./Modules.html
-
h2. Data Ingestion
* "Realtime":./Realtime.html
* "Batch":./Batch-ingestion.html
@@ -46,6 +42,8 @@ h2. Querying
** "SegmentMetadataQuery":./SegmentMetadataQuery.html
** "TimeBoundaryQuery":./TimeBoundaryQuery.html
** "TimeseriesQuery":./TimeseriesQuery.html
+** "TopNQuery":./TopNQuery.html
+*** "TopNMetricSpec":./TopNMetricSpec.html
h2. Architecture
* "Design":./Design.html
diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties
index 67cc44f74d7..8d9f1f35096 100644
--- a/examples/config/historical/runtime.properties
+++ b/examples/config/historical/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.26"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
diff --git a/examples/config/overlord/runtime.properties b/examples/config/overlord/runtime.properties
index c9c4478d4c4..f452a7d06d6 100644
--- a/examples/config/overlord/runtime.properties
+++ b/examples/config/overlord/runtime.properties
@@ -9,6 +9,7 @@ druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.selectors.indexing.serviceName=overlord
+druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx1g"
druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.computation.buffer.size=268435456
diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties
index aefabeda473..6ebf2e5dcdf 100644
--- a/examples/config/realtime/runtime.properties
+++ b/examples/config/realtime/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.26","io.druid.extensions:druid-kafka-seven:0.6.26","io.druid.extensions:druid-rabbitmq:0.6.26"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46","io.druid.extensions:druid-rabbitmq:0.6.46"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
diff --git a/examples/pom.xml b/examples/pom.xml
index 30ee4e6f586..0e4f56ca1e8 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.27-SNAPSHOT
+ 0.6.48-SNAPSHOT
@@ -104,6 +104,14 @@
+
+
+
+ true
+ true
+
+
+
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index 71da35eff72..a59524271d5 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.27-SNAPSHOT
+ 0.6.48-SNAPSHOT
@@ -71,4 +71,20 @@
test
+
+
+
+
+ maven-jar-plugin
+
+
+
+ true
+ true
+
+
+
+
+
+
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 3dfb59c2824..38d032fadba 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.27-SNAPSHOT
+ 0.6.48-SNAPSHOT
@@ -101,6 +101,17 @@
+
+ maven-jar-plugin
+
+
+
+ true
+ true
+
+
+
+ maven-shade-plugin
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java
index 9f1bd030284..2eedaf76d31 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java
@@ -62,7 +62,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper<
try {
inputRow = parser.parse(value.toString());
}
- catch (IllegalArgumentException e) {
+ catch (Exception e) {
if (config.isIgnoreInvalidRows()) {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 9b718e7e172..e8cfbdf8fa1 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druiddruid
- 0.6.27-SNAPSHOT
+ 0.6.48-SNAPSHOT
@@ -47,95 +47,10 @@
druid-indexing-hadoop${project.parent.version}
-
- com.metamx
- emitter
-
-
- com.metamx
- http-client
-
-
- com.metamx
- java-util
-
-
- com.metamx
- server-metrics
-
-
-
- commons-codec
- commons-codec
-
-
- commons-io
- commons-io
-
-
- org.skife.config
- config-magic
-
-
- org.apache.curator
- curator-framework
-
-
- org.apache.curator
- curator-recipes
-
-
- com.google.guava
- guava
-
-
- com.google.inject
- guice
-
-
- com.google.inject.extensions
- guice-servlet
-
-
- com.fasterxml.jackson.core
- jackson-core
-
-
- com.fasterxml.jackson.jaxrs
- jackson-jaxrs-json-provider
-
-
- com.fasterxml.jackson.core
- jackson-databind
-
-
- javax.inject
- javax.inject
-
-
- org.jdbi
- jdbi
-
-
- com.sun.jersey
- jersey-core
-
-
- com.sun.jersey.contribs
- jersey-guice
-
-
- org.eclipse.jetty
- jetty-server
-
-
- joda-time
- joda-time
-
-
- com.google.code.findbugs
- jsr305
+ mysql
+ mysql-connector-java
+ 5.1.18
@@ -160,4 +75,20 @@
test
+
+
+
+
+ maven-jar-plugin
+
+
+
+ true
+ true
+
+
+
+
+
+
diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java
index 703179f4542..ed9f628452c 100644
--- a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java
+++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
-import io.druid.indexing.common.config.EventReceiverFirehoseFactoryConfig;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.initialization.DruidModule;
@@ -46,7 +45,5 @@ public class IndexingServiceFirehoseModule implements DruidModule
@Override
public void configure(Binder binder)
{
- // backwards compatibility
- ConfigProvider.bind(binder, EventReceiverFirehoseFactoryConfig.class);
}
}
diff --git a/server/src/main/java/io/druid/guice/TaskLogsModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java
similarity index 76%
rename from server/src/main/java/io/druid/guice/TaskLogsModule.java
rename to indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java
index eedd12caabc..33452d0dfd9 100644
--- a/server/src/main/java/io/druid/guice/TaskLogsModule.java
+++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java
@@ -23,22 +23,27 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
+import io.druid.indexing.common.config.FileTaskLogsConfig;
+import io.druid.indexing.common.tasklogs.FileTaskLogs;
import io.druid.tasklogs.NoopTaskLogs;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogs;
/**
*/
-public class TaskLogsModule implements Module
+public class IndexingServiceTaskLogsModule implements Module
{
@Override
public void configure(Binder binder)
{
- PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(NoopTaskLogs.class));
+ PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
final MapBinder taskLogBinder = Binders.taskLogsBinder(binder);
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
+ taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
+ binder.bind(FileTaskLogs.class).in(LazySingleton.class);
+ JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
index 0a7a505d4ec..44f3c600f55 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
@@ -20,25 +20,36 @@
package io.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView;
+import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
+import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
+import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
+import org.joda.time.Interval;
import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
@@ -52,6 +63,8 @@ public class TaskToolbox
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
+ private final DataSegmentArchiver dataSegmentArchiver;
+ private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
@@ -68,6 +81,8 @@ public class TaskToolbox
ServiceEmitter emitter,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
+ DataSegmentMover dataSegmentMover,
+ DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@@ -84,6 +99,8 @@ public class TaskToolbox
this.emitter = emitter;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
+ this.dataSegmentMover = dataSegmentMover;
+ this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
@@ -119,6 +136,16 @@ public class TaskToolbox
return dataSegmentKiller;
}
+ public DataSegmentMover getDataSegmentMover()
+ {
+ return dataSegmentMover;
+ }
+
+ public DataSegmentArchiver getDataSegmentArchiver()
+ {
+ return dataSegmentArchiver;
+ }
+
public DataSegmentAnnouncer getSegmentAnnouncer()
{
return segmentAnnouncer;
@@ -149,7 +176,7 @@ public class TaskToolbox
return objectMapper;
}
- public Map getSegments(List segments)
+ public Map fetchSegments(List segments)
throws SegmentLoadingException
{
Map retVal = Maps.newLinkedHashMap();
@@ -160,6 +187,25 @@ public class TaskToolbox
return retVal;
}
+ public void pushSegments(Iterable segments) throws IOException {
+ // Request segment pushes for each set
+ final Multimap segmentMultimap = Multimaps.index(
+ segments,
+ new Function()
+ {
+ @Override
+ public Interval apply(DataSegment segment)
+ {
+ return segment.getInterval();
+ }
+ }
+ );
+ for (final Collection segmentCollection : segmentMultimap.asMap().values()) {
+ getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
+ }
+
+ }
+
public File getTaskWorkDir()
{
return taskWorkDir;
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
index ca00dccaf91..d655edc34f0 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
@@ -24,13 +24,14 @@ import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView;
-import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
+import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
+import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.server.coordination.DataSegmentAnnouncer;
@@ -47,6 +48,8 @@ public class TaskToolboxFactory
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
+ private final DataSegmentMover dataSegmentMover;
+ private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
@@ -62,6 +65,8 @@ public class TaskToolboxFactory
ServiceEmitter emitter,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
+ DataSegmentMover dataSegmentMover,
+ DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@@ -76,6 +81,8 @@ public class TaskToolboxFactory
this.emitter = emitter;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
+ this.dataSegmentMover = dataSegmentMover;
+ this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
@@ -96,6 +103,8 @@ public class TaskToolboxFactory
emitter,
segmentPusher,
dataSegmentKiller,
+ dataSegmentMover,
+ dataSegmentArchiver,
segmentAnnouncer,
newSegmentServerView,
queryRunnerFactoryConglomerate,
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/LocalTaskActionClient.java
index 8bb23918b01..4dd445df80d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/LocalTaskActionClient.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/LocalTaskActionClient.java
@@ -19,6 +19,7 @@
package io.druid.indexing.common.actions;
+import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskStorage;
@@ -45,21 +46,21 @@ public class LocalTaskActionClient implements TaskActionClient
{
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
- final RetType ret = taskAction.perform(task, toolbox);
-
if (taskAction.isAudited()) {
// Add audit log
try {
storage.addAuditLog(task, taskAction);
}
catch (Exception e) {
+ final String actionClass = taskAction.getClass().getName();
log.makeAlert(e, "Failed to record action in audit log")
.addData("task", task.getId())
- .addData("actionClass", taskAction.getClass().getName())
+ .addData("actionClass", actionClass)
.emit();
+ throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
}
}
- return ret;
+ return taskAction.perform(task, toolbox);
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/LockAcquireAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/LockAcquireAction.java
index 5730e3be082..5d600dcd369 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/LockAcquireAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/LockAcquireAction.java
@@ -68,7 +68,7 @@ public class LockAcquireAction implements TaskAction
@Override
public boolean isAudited()
{
- return true;
+ return false;
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/LockReleaseAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/LockReleaseAction.java
index 97397666d2b..6179c5ee658 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/LockReleaseAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/LockReleaseAction.java
@@ -60,7 +60,7 @@ public class LockReleaseAction implements TaskAction
@Override
public boolean isAudited()
{
- return true;
+ return false;
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SpawnTasksAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/LockTryAcquireAction.java
similarity index 64%
rename from indexing-service/src/main/java/io/druid/indexing/common/actions/SpawnTasksAction.java
rename to indexing-service/src/main/java/io/druid/indexing/common/actions/LockTryAcquireAction.java
index 85b1a53c275..699460af82f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SpawnTasksAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/LockTryAcquireAction.java
@@ -23,56 +23,54 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Optional;
+import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
+import org.joda.time.Interval;
-import java.util.List;
-
-public class SpawnTasksAction implements TaskAction
+public class LockTryAcquireAction implements TaskAction>
{
@JsonIgnore
- private final List newTasks;
+ private final Interval interval;
@JsonCreator
- public SpawnTasksAction(
- @JsonProperty("newTasks") List newTasks
+ public LockTryAcquireAction(
+ @JsonProperty("interval") Interval interval
)
{
- this.newTasks = ImmutableList.copyOf(newTasks);
+ this.interval = interval;
}
@JsonProperty
- public List getNewTasks()
+ public Interval getInterval()
{
- return newTasks;
+ return interval;
}
- public TypeReference getReturnTypeReference()
+ public TypeReference> getReturnTypeReference()
{
- return new TypeReference() {};
+ return new TypeReference>()
+ {
+ };
}
@Override
- public Void perform(Task task, TaskActionToolbox toolbox)
+ public Optional perform(Task task, TaskActionToolbox toolbox)
{
- for(final Task newTask : newTasks) {
- toolbox.getTaskQueue().add(newTask);
- }
-
- return null;
+ return toolbox.getTaskLockbox().tryLock(task, interval);
}
@Override
public boolean isAudited()
{
- return true;
+ return false;
}
@Override
public String toString()
{
- return "SpawnTasksAction{" +
- "newTasks=" + newTasks +
+ return "LockTryAcquireAction{" +
+ "interval=" + interval +
'}';
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
index aaad73b8a9f..5280e394f6f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
-import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
@@ -80,9 +79,7 @@ public class SegmentInsertAction implements TaskAction>
@Override
public Set perform(Task task, TaskActionToolbox toolbox) throws IOException
{
- if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) {
- throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments);
- }
+ toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
final Set retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java
new file mode 100644
index 00000000000..4356c80dc59
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java
@@ -0,0 +1,73 @@
+package io.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableSet;
+import com.metamx.emitter.service.ServiceMetricEvent;
+import io.druid.indexing.common.task.Task;
+import io.druid.timeline.DataSegment;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class SegmentMetadataUpdateAction implements TaskAction
+{
+ @JsonIgnore
+ private final Set segments;
+
+ @JsonCreator
+ public SegmentMetadataUpdateAction(
+ @JsonProperty("segments") Set segments
+ )
+ {
+ this.segments = ImmutableSet.copyOf(segments);
+ }
+
+ @JsonProperty
+ public Set getSegments()
+ {
+ return segments;
+ }
+
+ public TypeReference getReturnTypeReference()
+ {
+ return new TypeReference() {};
+ }
+
+ @Override
+ public Void perform(
+ Task task, TaskActionToolbox toolbox
+ ) throws IOException
+ {
+ toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
+ toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
+
+ // Emit metrics
+ final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
+ .setUser2(task.getDataSource())
+ .setUser4(task.getType());
+
+ for (DataSegment segment : segments) {
+ metricBuilder.setUser5(segment.getInterval().toString());
+ toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentMoved/bytes", segment.getSize()));
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean isAudited()
+ {
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SegmentMetadataUpdateAction{" +
+ "segments=" + segments +
+ '}';
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
index 6ac8dd1ccc4..54258df1c2d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
-import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
@@ -59,10 +58,7 @@ public class SegmentNukeAction implements TaskAction
@Override
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
- if(!toolbox.taskLockCoversSegments(task, segments, true)) {
- throw new ISE("Segments not covered by locks for task: %s", task.getId());
- }
-
+ toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
// Emit metrics
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
index 37d4346247e..038d06be3c6 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
@@ -29,13 +29,14 @@ import java.io.IOException;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "lockAcquire", value = LockAcquireAction.class),
+ @JsonSubTypes.Type(name = "lockTryAcquire", value = LockTryAcquireAction.class),
@JsonSubTypes.Type(name = "lockList", value = LockListAction.class),
@JsonSubTypes.Type(name = "lockRelease", value = LockReleaseAction.class),
@JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class),
@JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
- @JsonSubTypes.Type(name = "spawnTasks", value = SpawnTasksAction.class)
+ @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class)
})
public interface TaskAction
{
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
index b7e78e0c2be..d9b0520f40b 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
@@ -19,15 +19,16 @@
package io.druid.indexing.common.actions;
+import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
+import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerDBCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
-import io.druid.indexing.overlord.TaskQueue;
import io.druid.timeline.DataSegment;
import java.util.List;
@@ -35,30 +36,22 @@ import java.util.Set;
public class TaskActionToolbox
{
- private final TaskQueue taskQueue;
private final TaskLockbox taskLockbox;
private final IndexerDBCoordinator indexerDBCoordinator;
private final ServiceEmitter emitter;
@Inject
public TaskActionToolbox(
- TaskQueue taskQueue,
TaskLockbox taskLockbox,
IndexerDBCoordinator indexerDBCoordinator,
ServiceEmitter emitter
)
{
- this.taskQueue = taskQueue;
this.taskLockbox = taskLockbox;
this.indexerDBCoordinator = indexerDBCoordinator;
this.emitter = emitter;
}
- public TaskQueue getTaskQueue()
- {
- return taskQueue;
- }
-
public TaskLockbox getTaskLockbox()
{
return taskLockbox;
@@ -74,6 +67,38 @@ public class TaskActionToolbox
return emitter;
}
+ public boolean segmentsAreFromSamePartitionSet(
+ final Set segments
+ )
+ {
+ // Verify that these segments are all in the same partition set
+
+ Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty");
+ final DataSegment firstSegment = segments.iterator().next();
+ for (final DataSegment segment : segments) {
+ if (!segment.getDataSource().equals(firstSegment.getDataSource())
+ || !segment.getInterval().equals(firstSegment.getInterval())
+ || !segment.getVersion().equals(firstSegment.getVersion())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void verifyTaskLocksAndSinglePartitionSettitude(
+ final Task task,
+ final Set segments,
+ final boolean allowOlderVersions
+ )
+ {
+ if (!taskLockCoversSegments(task, segments, allowOlderVersions)) {
+ throw new ISE("Segments not covered by locks for task: %s", task.getId());
+ }
+ if (!segmentsAreFromSamePartitionSet(segments)) {
+ throw new ISE("Segments are not in the same partition set: %s", segments);
+ }
+ }
+
public boolean taskLockCoversSegments(
final Task task,
final Set segments,
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/IndexerZkConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java
similarity index 75%
rename from indexing-service/src/main/java/io/druid/indexing/common/config/IndexerZkConfig.java
rename to indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java
index 67a750ab535..dfc7c9a9951 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/config/IndexerZkConfig.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java
@@ -19,15 +19,19 @@
package io.druid.indexing.common.config;
-import io.druid.server.initialization.ZkPathsConfig;
-import org.skife.config.Config;
-import org.skife.config.Default;
+import com.fasterxml.jackson.annotation.JsonProperty;
-/**
- */
-public abstract class IndexerZkConfig extends ZkPathsConfig
+import javax.validation.constraints.NotNull;
+import java.io.File;
+
+public class FileTaskLogsConfig
{
- @Config("druid.zk.maxNumBytes")
- @Default("512000")
- public abstract long getMaxNumBytes();
+ @JsonProperty
+ @NotNull
+ private File directory = new File("log");
+
+ public File getDirectory()
+ {
+ return directory;
+ }
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java
new file mode 100644
index 00000000000..db40c7cb069
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java
@@ -0,0 +1,19 @@
+package io.druid.indexing.common.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.validation.constraints.NotNull;
+
+public class TaskStorageConfig
+{
+ @JsonProperty
+ @NotNull
+ public Duration recentlyFinishedThreshold = new Period("PT24H").toStandardDuration();
+
+ public Duration getRecentlyFinishedThreshold()
+ {
+ return recentlyFinishedThreshold;
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java
index 0f3732ed386..b9e420f7020 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java
@@ -33,7 +33,6 @@ import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.MapInputRowParser;
-import io.druid.indexing.common.config.EventReceiverFirehoseFactoryConfig;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -63,31 +62,15 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
private final MapInputRowParser parser;
private final Optional chatHandlerProvider;
- @Deprecated
- private final EventReceiverFirehoseFactoryConfig config;
-
@JsonCreator
public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName,
- @JsonProperty("firehoseId") String firehoseId,
@JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("parser") MapInputRowParser parser,
- @JacksonInject ChatHandlerProvider chatHandlerProvider,
- @JacksonInject EventReceiverFirehoseFactoryConfig config
+ @JacksonInject ChatHandlerProvider chatHandlerProvider
)
{
- // This code is here for backwards compatibility
- if (serviceName == null) {
- this.serviceName = String.format(
- "%s:%s",
- config.getFirehoseIdPrefix(),
- Preconditions.checkNotNull(firehoseId, "firehoseId")
- );
- } else {
- this.serviceName = serviceName;
- }
- this.config = config;
-
+ this.serviceName = Preconditions.checkNotNull(serviceName, "serviceName");
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.parser = Preconditions.checkNotNull(parser, "parser");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
@@ -117,13 +100,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
return serviceName;
}
- @Deprecated
- @JsonProperty("firehoseId")
- public String getFirehoseId()
- {
- return serviceName.replaceFirst(String.format("%s:", config.getFirehoseIdPrefix()), "");
- }
-
@JsonProperty
public int getBufferSize()
{
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java
new file mode 100644
index 00000000000..2d6687ed920
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java
@@ -0,0 +1,77 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import org.joda.time.Interval;
+
+public abstract class AbstractFixedIntervalTask extends AbstractTask
+{
+ @JsonIgnore
+ private final Interval interval;
+
+ protected AbstractFixedIntervalTask(
+ String id,
+ String dataSource,
+ Interval interval
+ )
+ {
+ this(id, id, new TaskResource(id, 1), dataSource, interval);
+ }
+
+ protected AbstractFixedIntervalTask(
+ String id,
+ String groupId,
+ String dataSource,
+ Interval interval
+ )
+ {
+ this(id, groupId, new TaskResource(id, 1), dataSource, interval);
+ }
+
+ protected AbstractFixedIntervalTask(
+ String id,
+ String groupId,
+ TaskResource taskResource,
+ String dataSource,
+ Interval interval
+ )
+ {
+ super(id, groupId, taskResource, dataSource);
+ this.interval = Preconditions.checkNotNull(interval, "interval");
+ Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception
+ {
+ return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
index 1944243e7fe..eaff1b9b46f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
@@ -23,21 +23,15 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
-import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
-import io.druid.indexing.common.actions.SegmentListUsedAction;
-import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
-import org.joda.time.Interval;
import java.io.IOException;
-import java.util.List;
public abstract class AbstractTask implements Task
{
@@ -55,26 +49,22 @@ public abstract class AbstractTask implements Task
@JsonIgnore
private final String dataSource;
- @JsonIgnore
- private final Optional interval;
-
- protected AbstractTask(String id, String dataSource, Interval interval)
+ protected AbstractTask(String id, String dataSource)
{
- this(id, id, new TaskResource(id, 1), dataSource, interval);
+ this(id, id, new TaskResource(id, 1), dataSource);
}
- protected AbstractTask(String id, String groupId, String dataSource, Interval interval)
+ protected AbstractTask(String id, String groupId, String dataSource)
{
- this(id, groupId, new TaskResource(id, 1), dataSource, interval);
+ this(id, groupId, new TaskResource(id, 1), dataSource);
}
- protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval)
+ protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.taskResource = Preconditions.checkNotNull(taskResource, "resource");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
- this.interval = Optional.fromNullable(interval);
}
@JsonProperty
@@ -111,25 +101,12 @@ public abstract class AbstractTask implements Task
return dataSource;
}
- @JsonProperty("interval")
- @Override
- public Optional getImplicitLockInterval()
- {
- return interval;
- }
-
@Override
public QueryRunner getQueryRunner(Query query)
{
return null;
}
- @Override
- public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
- {
- return TaskStatus.running(id);
- }
-
@Override
public String toString()
{
@@ -137,7 +114,6 @@ public abstract class AbstractTask implements Task
.add("id", id)
.add("type", getType())
.add("dataSource", dataSource)
- .add("interval", getImplicitLockInterval())
.toString();
}
@@ -149,11 +125,6 @@ public abstract class AbstractTask implements Task
return ID_JOINER.join(objects);
}
- public SegmentListUsedAction defaultListUsedAction()
- {
- return new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get());
- }
-
public TaskStatus success()
{
return TaskStatus.success(getId());
@@ -186,14 +157,6 @@ public abstract class AbstractTask implements Task
protected Iterable getTaskLocks(TaskToolbox toolbox) throws IOException
{
- final List locks = toolbox.getTaskActionClient().submit(new LockListAction());
-
- if (locks.isEmpty() && getImplicitLockInterval().isPresent()) {
- // In the Peon's local mode, the implicit lock interval is not pre-acquired, so we need to try it here.
- toolbox.getTaskActionClient().submit(new LockAcquireAction(getImplicitLockInterval().get()));
- return toolbox.getTaskActionClient().submit(new LockListAction());
- } else {
- return locks;
- }
+ return toolbox.getTaskActionClient().submit(new LockListAction());
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java
new file mode 100644
index 00000000000..c863742e0ab
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java
@@ -0,0 +1,110 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.metamx.common.ISE;
+import com.metamx.common.logger.Logger;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentListUnusedAction;
+import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
+import io.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+public class ArchiveTask extends AbstractFixedIntervalTask
+{
+ private static final Logger log = new Logger(ArchiveTask.class);
+
+ public ArchiveTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval
+ )
+ {
+ super(
+ TaskUtils.makeId(id, "archive", dataSource, interval),
+ dataSource,
+ interval
+ );
+ }
+
+ @Override
+ public String getType()
+ {
+ return "archive";
+ }
+
+ @Override
+ public TaskStatus run(TaskToolbox toolbox) throws Exception
+ {
+ // Confirm we have a lock (will throw if there isn't exactly one element)
+ final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+
+ if (!myLock.getDataSource().equals(getDataSource())) {
+ throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
+ }
+
+ if (!myLock.getInterval().equals(getInterval())) {
+ throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
+ }
+
+ // List unused segments
+ final List unusedSegments = toolbox
+ .getTaskActionClient()
+ .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
+
+ // Verify none of these segments have versions > lock version
+ for (final DataSegment unusedSegment : unusedSegments) {
+ if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
+ throw new ISE(
+ "WTF?! Unused segment[%s] has version[%s] > task version[%s]",
+ unusedSegment.getIdentifier(),
+ unusedSegment.getVersion(),
+ myLock.getVersion()
+ );
+ }
+
+ log.info("OK to archive segment: %s", unusedSegment.getIdentifier());
+ }
+
+ List archivedSegments = Lists.newLinkedList();
+
+ // Move segments
+ for (DataSegment segment : unusedSegments) {
+ archivedSegments.add(toolbox.getDataSegmentArchiver().archive(segment));
+ }
+
+ // Update metadata for moved segments
+ toolbox.getTaskActionClient().submit(
+ new SegmentMetadataUpdateAction(
+ ImmutableSet.copyOf(archivedSegments)
+ )
+ );
+
+ return TaskStatus.success(getId());
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java
index 32d3e49e618..970818a6e9d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java
@@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -30,7 +31,6 @@ import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
-import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger;
@@ -44,7 +44,7 @@ import org.joda.time.Interval;
import java.io.File;
-public class DeleteTask extends AbstractTask
+public class DeleteTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(DeleteTask.class);
@@ -78,16 +78,15 @@ public class DeleteTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
- final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
- final Interval interval = this.getImplicitLockInterval().get();
+ final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
- final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty);
+ final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
// Create DataSegment
final DataSegment segment =
DataSegment.builder()
.dataSource(this.getDataSource())
- .interval(interval)
+ .interval(getInterval())
.version(myLock.getVersion())
.shardSpec(new NoneShardSpec())
.build();
@@ -105,7 +104,7 @@ public class DeleteTask extends AbstractTask
segment.getVersion()
);
- toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
+ toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index e687875433b..233714f5c71 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -24,10 +24,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.Lists;
+import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidIndexerConfig;
@@ -37,21 +41,27 @@ import io.druid.indexer.HadoopDruidIndexerSchema;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
+import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.timeline.DataSegment;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
-public class HadoopIndexTask extends AbstractTask
+public class HadoopIndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(HadoopIndexTask.class);
private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
@@ -88,10 +98,14 @@ public class HadoopIndexTask extends AbstractTask
super(
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()),
schema.getDataSource(),
- JodaUtils.umbrellaInterval(JodaUtils.condenseIntervals(schema.getGranularitySpec().bucketIntervals()))
+ JodaUtils.umbrellaInterval(
+ JodaUtils.condenseIntervals(
+ schema.getGranularitySpec()
+ .bucketIntervals()
+ )
+ )
);
-
// Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(schema.getSegmentOutputPath() == null, "segmentOutputPath must be absent");
Preconditions.checkArgument(schema.getWorkingPath() == null, "workingPath must be absent");
@@ -107,7 +121,6 @@ public class HadoopIndexTask extends AbstractTask
return "index_hadoop";
}
-
@JsonProperty("config")
public HadoopDruidIndexerSchema getSchema()
{
@@ -174,14 +187,10 @@ public class HadoopIndexTask extends AbstractTask
if (segments != null) {
List publishedSegments = toolbox.getObjectMapper().readValue(
- segments, new TypeReference>()
- {
- }
+ segments,
+ new TypeReference>() {}
);
- // Request segment pushes
- toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
-
- // Done
+ toolbox.pushSegments(publishedSegments);
return TaskStatus.success(getId());
} else {
return TaskStatus.failure(getId());
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexDeterminePartitionsTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexDeterminePartitionsTask.java
deleted file mode 100644
index a89cd475ff0..00000000000
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexDeterminePartitionsTask.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultiset;
-import com.google.common.primitives.Ints;
-import com.metamx.common.logger.Logger;
-import io.druid.data.input.Firehose;
-import io.druid.data.input.FirehoseFactory;
-import io.druid.data.input.InputRow;
-import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.TaskToolbox;
-import io.druid.indexing.common.actions.SpawnTasksAction;
-import io.druid.segment.realtime.Schema;
-import io.druid.timeline.partition.NoneShardSpec;
-import io.druid.timeline.partition.ShardSpec;
-import io.druid.timeline.partition.SingleDimensionShardSpec;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class IndexDeterminePartitionsTask extends AbstractTask
-{
- private static String makeTaskId(String groupId, DateTime start, DateTime end)
- {
- return String.format(
- "%s_partitions_%s_%s",
- groupId,
- start,
- end
- );
- }
-
- @JsonIgnore
- private final FirehoseFactory firehoseFactory;
-
- @JsonIgnore
- private final Schema schema;
-
- @JsonIgnore
- private final long targetPartitionSize;
-
- @JsonIgnore
- private final int rowFlushBoundary;
-
- private static final Logger log = new Logger(IndexTask.class);
-
- @JsonCreator
- public IndexDeterminePartitionsTask(
- @JsonProperty("id") String id,
- @JsonProperty("groupId") String groupId,
- @JsonProperty("interval") Interval interval,
- @JsonProperty("firehose") FirehoseFactory firehoseFactory,
- @JsonProperty("schema") Schema schema,
- @JsonProperty("targetPartitionSize") long targetPartitionSize,
- @JsonProperty("rowFlushBoundary") int rowFlushBoundary
- )
- {
- super(
- id != null ? id : makeTaskId(groupId, interval.getStart(), interval.getEnd()),
- groupId,
- schema.getDataSource(),
- Preconditions.checkNotNull(interval, "interval")
- );
-
- this.firehoseFactory = firehoseFactory;
- this.schema = schema;
- this.targetPartitionSize = targetPartitionSize;
- this.rowFlushBoundary = rowFlushBoundary;
- }
-
- @Override
- public String getType()
- {
- return "index_partitions";
- }
-
- @Override
- public TaskStatus run(TaskToolbox toolbox) throws Exception
- {
- log.info("Running with targetPartitionSize[%d]", targetPartitionSize);
-
- // The implementation of this determine partitions stuff is less than optimal. Should be done better.
-
- // We know this exists
- final Interval interval = getImplicitLockInterval().get();
-
- // Blacklist dimensions that have multiple values per row
- final Set unusableDimensions = Sets.newHashSet();
-
- // Track values of all non-blacklisted dimensions
- final Map> dimensionValueMultisets = Maps.newHashMap();
-
- // Load data
- final Firehose firehose = firehoseFactory.connect();
-
- try {
- while (firehose.hasMore()) {
-
- final InputRow inputRow = firehose.nextRow();
-
- if (interval.contains(inputRow.getTimestampFromEpoch())) {
-
- // Extract dimensions from event
- for (final String dim : inputRow.getDimensions()) {
- final List dimValues = inputRow.getDimension(dim);
-
- if (!unusableDimensions.contains(dim)) {
-
- if (dimValues.size() == 1) {
-
- // Track this value
- TreeMultiset dimensionValueMultiset = dimensionValueMultisets.get(dim);
-
- if (dimensionValueMultiset == null) {
- dimensionValueMultiset = TreeMultiset.create();
- dimensionValueMultisets.put(dim, dimensionValueMultiset);
- }
-
- dimensionValueMultiset.add(dimValues.get(0));
-
- } else {
-
- // Only single-valued dimensions can be used for partitions
- unusableDimensions.add(dim);
- dimensionValueMultisets.remove(dim);
-
- }
-
- }
- }
-
- }
-
- }
- }
- finally {
- firehose.close();
- }
-
- // ShardSpecs for index generator tasks
- final List shardSpecs = Lists.newArrayList();
-
- // Select highest-cardinality dimension
- Ordering>> byCardinalityOrdering = new Ordering>>()
- {
- @Override
- public int compare(
- Map.Entry> left,
- Map.Entry> right
- )
- {
- return Ints.compare(left.getValue().elementSet().size(), right.getValue().elementSet().size());
- }
- };
-
- if (dimensionValueMultisets.isEmpty()) {
- // No suitable partition dimension. We'll make one big segment and hope for the best.
- log.info("No suitable partition dimension found");
- shardSpecs.add(new NoneShardSpec());
- } else {
- // Find best partition dimension (heuristic: highest cardinality).
- final Map.Entry> partitionEntry =
- byCardinalityOrdering.max(dimensionValueMultisets.entrySet());
-
- final String partitionDim = partitionEntry.getKey();
- final TreeMultiset partitionDimValues = partitionEntry.getValue();
-
- log.info(
- "Partitioning on dimension[%s] with cardinality[%d] over rows[%d]",
- partitionDim,
- partitionDimValues.elementSet().size(),
- partitionDimValues.size()
- );
-
- // Iterate over unique partition dimension values in sorted order
- String currentPartitionStart = null;
- int currentPartitionSize = 0;
- for (final String partitionDimValue : partitionDimValues.elementSet()) {
- currentPartitionSize += partitionDimValues.count(partitionDimValue);
- if (currentPartitionSize >= targetPartitionSize) {
- final ShardSpec shardSpec = new SingleDimensionShardSpec(
- partitionDim,
- currentPartitionStart,
- partitionDimValue,
- shardSpecs.size()
- );
-
- log.info("Adding shard: %s", shardSpec);
- shardSpecs.add(shardSpec);
-
- currentPartitionSize = partitionDimValues.count(partitionDimValue);
- currentPartitionStart = partitionDimValue;
- }
- }
-
- if (currentPartitionSize > 0) {
- // One last shard to go
- final ShardSpec shardSpec;
-
- if (shardSpecs.isEmpty()) {
- shardSpec = new NoneShardSpec();
- } else {
- shardSpec = new SingleDimensionShardSpec(
- partitionDim,
- currentPartitionStart,
- null,
- shardSpecs.size()
- );
- }
-
- log.info("Adding shard: %s", shardSpec);
- shardSpecs.add(shardSpec);
- }
- }
-
- List nextTasks = Lists.transform(
- shardSpecs,
- new Function()
- {
- @Override
- public Task apply(ShardSpec shardSpec)
- {
- return new IndexGeneratorTask(
- null,
- getGroupId(),
- getImplicitLockInterval().get(),
- firehoseFactory,
- new Schema(
- schema.getDataSource(),
- schema.getSpatialDimensions(),
- schema.getAggregators(),
- schema.getIndexGranularity(),
- shardSpec
- ),
- rowFlushBoundary
- );
- }
- }
- );
-
- toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
-
- return TaskStatus.success(getId());
- }
-
- @JsonProperty
- public FirehoseFactory getFirehoseFactory()
- {
- return firehoseFactory;
- }
-
- @JsonProperty
- public Schema getSchema()
- {
- return schema;
- }
-
- @JsonProperty
- public long getTargetPartitionSize()
- {
- return targetPartitionSize;
- }
-
- @JsonProperty
- public int getRowFlushBoundary()
- {
- return rowFlushBoundary;
- }
-}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexGeneratorTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexGeneratorTask.java
deleted file mode 100644
index 84f6211ad52..00000000000
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexGeneratorTask.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.metamx.common.logger.Logger;
-import io.druid.data.input.Firehose;
-import io.druid.data.input.FirehoseFactory;
-import io.druid.data.input.InputRow;
-import io.druid.indexing.common.TaskLock;
-import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.TaskToolbox;
-import io.druid.indexing.common.actions.SegmentInsertAction;
-import io.druid.indexing.common.index.YeOldePlumberSchool;
-import io.druid.segment.loading.DataSegmentPusher;
-import io.druid.segment.realtime.FireDepartmentMetrics;
-import io.druid.segment.realtime.Schema;
-import io.druid.segment.realtime.plumber.Plumber;
-import io.druid.segment.realtime.plumber.Sink;
-import io.druid.timeline.DataSegment;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class IndexGeneratorTask extends AbstractTask
-{
- @JsonIgnore
- private final FirehoseFactory firehoseFactory;
-
- @JsonIgnore
- private final Schema schema;
-
- @JsonIgnore
- private final int rowFlushBoundary;
-
- private static final Logger log = new Logger(IndexTask.class);
-
- @JsonCreator
- public IndexGeneratorTask(
- @JsonProperty("id") String id,
- @JsonProperty("groupId") String groupId,
- @JsonProperty("interval") Interval interval,
- @JsonProperty("firehose") FirehoseFactory firehoseFactory,
- @JsonProperty("schema") Schema schema,
- @JsonProperty("rowFlushBoundary") int rowFlushBoundary
- )
- {
- super(
- id != null
- ? id
- : String.format(
- "%s_generator_%s_%s_%s",
- groupId,
- interval.getStart(),
- interval.getEnd(),
- schema.getShardSpec().getPartitionNum()
- ),
- groupId,
- schema.getDataSource(),
- Preconditions.checkNotNull(interval, "interval")
- );
-
- this.firehoseFactory = firehoseFactory;
- this.schema = schema;
- this.rowFlushBoundary = rowFlushBoundary;
- }
-
- @Override
- public String getType()
- {
- return "index_generator";
- }
-
- @Override
- public TaskStatus run(final TaskToolbox toolbox) throws Exception
- {
- // We should have a lock from before we started running
- final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
-
- // We know this exists
- final Interval interval = getImplicitLockInterval().get();
-
- // Set up temporary directory for indexing
- final File tmpDir = new File(
- toolbox.getTaskWorkDir(),
- String.format(
- "%s_%s_%s_%s_%s",
- this.getDataSource(),
- interval.getStart(),
- interval.getEnd(),
- myLock.getVersion(),
- schema.getShardSpec().getPartitionNum()
- )
- );
-
- // We need to track published segments.
- final List pushedSegments = new CopyOnWriteArrayList();
- final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
- {
- @Override
- public String getPathForHadoop(String dataSource)
- {
- return toolbox.getSegmentPusher().getPathForHadoop(dataSource);
- }
-
- @Override
- public DataSegment push(File file, DataSegment segment) throws IOException
- {
- final DataSegment pushedSegment = toolbox.getSegmentPusher().push(file, segment);
- pushedSegments.add(pushedSegment);
- return pushedSegment;
- }
- };
-
- // Create firehose + plumber
- final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
- final Firehose firehose = firehoseFactory.connect();
- final Plumber plumber = new YeOldePlumberSchool(
- interval,
- myLock.getVersion(),
- wrappedDataSegmentPusher,
- tmpDir
- ).findPlumber(schema, metrics);
-
- // rowFlushBoundary for this job
- final int myRowFlushBoundary = this.rowFlushBoundary > 0
- ? rowFlushBoundary
- : toolbox.getConfig().getDefaultRowFlushBoundary();
-
- try {
- while (firehose.hasMore()) {
- final InputRow inputRow = firehose.nextRow();
-
- if (shouldIndex(inputRow)) {
- final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
- if (sink == null) {
- throw new NullPointerException(
- String.format(
- "Was expecting non-null sink for timestamp[%s]",
- new DateTime(inputRow.getTimestampFromEpoch())
- )
- );
- }
-
- int numRows = sink.add(inputRow);
- metrics.incrementProcessed();
-
- if (numRows >= myRowFlushBoundary) {
- plumber.persist(firehose.commit());
- }
- } else {
- metrics.incrementThrownAway();
- }
- }
- }
- finally {
- firehose.close();
- }
-
- plumber.persist(firehose.commit());
- plumber.finishJob();
-
- // Output metrics
- log.info(
- "Task[%s] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows",
- getId(),
- metrics.processed() + metrics.unparseable() + metrics.thrownAway(),
- metrics.processed(),
- metrics.unparseable(),
- metrics.thrownAway(),
- metrics.rowOutput()
- );
-
- // Request segment pushes
- toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
-
- // Done
- return TaskStatus.success(getId());
- }
-
- /**
- * Should we index this inputRow? Decision is based on our interval and shardSpec.
- *
- * @param inputRow the row to check
- *
- * @return true or false
- */
- private boolean shouldIndex(InputRow inputRow)
- {
- if (getImplicitLockInterval().get().contains(inputRow.getTimestampFromEpoch())) {
- return schema.getShardSpec().isInChunk(inputRow);
- } else {
- return false;
- }
- }
-
- @JsonProperty("firehose")
- public FirehoseFactory getFirehoseFactory()
- {
- return firehoseFactory;
- }
-
- @JsonProperty
- public Schema getSchema()
- {
- return schema;
- }
-
- @JsonProperty
- public int getRowFlushBoundary()
- {
- return rowFlushBoundary;
- }
-}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 6e7d9f61f17..25603c981cd 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -22,26 +22,48 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.api.client.util.Sets;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.TreeMultiset;
+import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
+import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
+import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.GranularitySpec;
+import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
-import io.druid.indexing.common.actions.SpawnTasksAction;
-import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.actions.SegmentInsertAction;
+import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
+import io.druid.segment.realtime.plumber.Plumber;
+import io.druid.segment.realtime.plumber.Sink;
+import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
+import io.druid.timeline.partition.ShardSpec;
+import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
-public class IndexTask extends AbstractTask
+public class IndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(IndexTask.class);
@@ -58,7 +80,7 @@ public class IndexTask extends AbstractTask
private final QueryGranularity indexGranularity;
@JsonIgnore
- private final long targetPartitionSize;
+ private final int targetPartitionSize;
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@@ -74,7 +96,7 @@ public class IndexTask extends AbstractTask
@JsonProperty("spatialDimensions") List spatialDimensions,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
- @JsonProperty("targetPartitionSize") long targetPartitionSize,
+ @JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
@@ -96,58 +118,10 @@ public class IndexTask extends AbstractTask
this.aggregators = aggregators;
this.indexGranularity = (indexGranularity == null) ? QueryGranularity.NONE : indexGranularity;
this.targetPartitionSize = targetPartitionSize;
- this.firehoseFactory = firehoseFactory;
+ this.firehoseFactory = Preconditions.checkNotNull(firehoseFactory, "firehoseFactory");
this.rowFlushBoundary = rowFlushBoundary;
}
- public List toSubtasks()
- {
- final List retVal = Lists.newArrayList();
-
- for (final Interval interval : granularitySpec.bucketIntervals()) {
- if (targetPartitionSize > 0) {
- // Need to do one pass over the data before indexing in order to determine good partitions
- retVal.add(
- new IndexDeterminePartitionsTask(
- null,
- getGroupId(),
- interval,
- firehoseFactory,
- new Schema(
- getDataSource(),
- spatialDimensions,
- aggregators,
- indexGranularity,
- new NoneShardSpec()
- ),
- targetPartitionSize,
- rowFlushBoundary
- )
- );
- } else {
- // Jump straight into indexing
- retVal.add(
- new IndexGeneratorTask(
- null,
- getGroupId(),
- interval,
- firehoseFactory,
- new Schema(
- getDataSource(),
- spatialDimensions,
- aggregators,
- indexGranularity,
- new NoneShardSpec()
- ),
- rowFlushBoundary
- )
- );
- }
- }
-
- return retVal;
- }
-
@Override
public String getType()
{
@@ -155,16 +129,278 @@ public class IndexTask extends AbstractTask
}
@Override
- public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
+ public TaskStatus run(TaskToolbox toolbox) throws Exception
{
- taskActionClient.submit(new SpawnTasksAction(toSubtasks()));
+ final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+ final Set segments = Sets.newHashSet();
+ for (final Interval bucket : granularitySpec.bucketIntervals()) {
+ final List shardSpecs;
+ if (targetPartitionSize > 0) {
+ shardSpecs = determinePartitions(bucket, targetPartitionSize);
+ } else {
+ shardSpecs = ImmutableList.of(new NoneShardSpec());
+ }
+ for (final ShardSpec shardSpec : shardSpecs) {
+ final DataSegment segment = generateSegment(
+ toolbox,
+ new Schema(
+ getDataSource(),
+ spatialDimensions,
+ aggregators,
+ indexGranularity,
+ shardSpec
+ ),
+ bucket,
+ myLock.getVersion()
+ );
+ segments.add(segment);
+ }
+ }
+ toolbox.pushSegments(segments);
return TaskStatus.success(getId());
}
- @Override
- public TaskStatus run(TaskToolbox toolbox) throws Exception
+ private List determinePartitions(
+ final Interval interval,
+ final int targetPartitionSize
+ ) throws IOException
{
- throw new IllegalStateException("IndexTasks should not be run!");
+ log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize);
+
+ // The implementation of this determine partitions stuff is less than optimal. Should be done better.
+
+ // Blacklist dimensions that have multiple values per row
+ final Set unusableDimensions = com.google.common.collect.Sets.newHashSet();
+ // Track values of all non-blacklisted dimensions
+ final Map> dimensionValueMultisets = Maps.newHashMap();
+
+ // Load data
+ try (Firehose firehose = firehoseFactory.connect()) {
+ while (firehose.hasMore()) {
+ final InputRow inputRow = firehose.nextRow();
+ if (interval.contains(inputRow.getTimestampFromEpoch())) {
+ // Extract dimensions from event
+ for (final String dim : inputRow.getDimensions()) {
+ final List dimValues = inputRow.getDimension(dim);
+ if (!unusableDimensions.contains(dim)) {
+ if (dimValues.size() == 1) {
+ // Track this value
+ TreeMultiset dimensionValueMultiset = dimensionValueMultisets.get(dim);
+ if (dimensionValueMultiset == null) {
+ dimensionValueMultiset = TreeMultiset.create();
+ dimensionValueMultisets.put(dim, dimensionValueMultiset);
+ }
+ dimensionValueMultiset.add(dimValues.get(0));
+ } else {
+ // Only single-valued dimensions can be used for partitions
+ unusableDimensions.add(dim);
+ dimensionValueMultisets.remove(dim);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // ShardSpecs we will return
+ final List shardSpecs = Lists.newArrayList();
+
+ // Select highest-cardinality dimension
+ Ordering>> byCardinalityOrdering = new Ordering>>()
+ {
+ @Override
+ public int compare(
+ Map.Entry> left,
+ Map.Entry> right
+ )
+ {
+ return Ints.compare(left.getValue().elementSet().size(), right.getValue().elementSet().size());
+ }
+ };
+
+ if (dimensionValueMultisets.isEmpty()) {
+ // No suitable partition dimension. We'll make one big segment and hope for the best.
+ log.info("No suitable partition dimension found");
+ shardSpecs.add(new NoneShardSpec());
+ } else {
+ // Find best partition dimension (heuristic: highest cardinality).
+ final Map.Entry> partitionEntry =
+ byCardinalityOrdering.max(dimensionValueMultisets.entrySet());
+
+ final String partitionDim = partitionEntry.getKey();
+ final TreeMultiset partitionDimValues = partitionEntry.getValue();
+
+ log.info(
+ "Partitioning on dimension[%s] with cardinality[%d] over rows[%d]",
+ partitionDim,
+ partitionDimValues.elementSet().size(),
+ partitionDimValues.size()
+ );
+
+ // Iterate over unique partition dimension values in sorted order
+ String currentPartitionStart = null;
+ int currentPartitionSize = 0;
+ for (final String partitionDimValue : partitionDimValues.elementSet()) {
+ currentPartitionSize += partitionDimValues.count(partitionDimValue);
+ if (currentPartitionSize >= targetPartitionSize) {
+ final ShardSpec shardSpec = new SingleDimensionShardSpec(
+ partitionDim,
+ currentPartitionStart,
+ partitionDimValue,
+ shardSpecs.size()
+ );
+
+ log.info("Adding shard: %s", shardSpec);
+ shardSpecs.add(shardSpec);
+
+ currentPartitionSize = partitionDimValues.count(partitionDimValue);
+ currentPartitionStart = partitionDimValue;
+ }
+ }
+
+ if (currentPartitionSize > 0) {
+ // One last shard to go
+ final ShardSpec shardSpec;
+
+ if (shardSpecs.isEmpty()) {
+ shardSpec = new NoneShardSpec();
+ } else {
+ shardSpec = new SingleDimensionShardSpec(
+ partitionDim,
+ currentPartitionStart,
+ null,
+ shardSpecs.size()
+ );
+ }
+
+ log.info("Adding shard: %s", shardSpec);
+ shardSpecs.add(shardSpec);
+ }
+ }
+
+ return shardSpecs;
+ }
+
+ private DataSegment generateSegment(
+ final TaskToolbox toolbox,
+ final Schema schema,
+ final Interval interval,
+ final String version
+ ) throws IOException
+ {
+ // Set up temporary directory.
+ final File tmpDir = new File(
+ toolbox.getTaskWorkDir(),
+ String.format(
+ "%s_%s_%s_%s_%s",
+ this.getDataSource(),
+ interval.getStart(),
+ interval.getEnd(),
+ version,
+ schema.getShardSpec().getPartitionNum()
+ )
+ );
+
+ // We need to track published segments.
+ final List pushedSegments = new CopyOnWriteArrayList();
+ final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
+ {
+ @Override
+ public String getPathForHadoop(String dataSource)
+ {
+ return toolbox.getSegmentPusher().getPathForHadoop(dataSource);
+ }
+
+ @Override
+ public DataSegment push(File file, DataSegment segment) throws IOException
+ {
+ final DataSegment pushedSegment = toolbox.getSegmentPusher().push(file, segment);
+ pushedSegments.add(pushedSegment);
+ return pushedSegment;
+ }
+ };
+
+ // Create firehose + plumber
+ final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
+ final Firehose firehose = firehoseFactory.connect();
+ final Plumber plumber = new YeOldePlumberSchool(
+ interval,
+ version,
+ wrappedDataSegmentPusher,
+ tmpDir
+ ).findPlumber(schema, metrics);
+
+ // rowFlushBoundary for this job
+ final int myRowFlushBoundary = this.rowFlushBoundary > 0
+ ? rowFlushBoundary
+ : toolbox.getConfig().getDefaultRowFlushBoundary();
+
+ try {
+ plumber.startJob();
+
+ while (firehose.hasMore()) {
+ final InputRow inputRow = firehose.nextRow();
+
+ if (shouldIndex(schema, interval, inputRow)) {
+ final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
+ if (sink == null) {
+ throw new NullPointerException(
+ String.format(
+ "Was expecting non-null sink for timestamp[%s]",
+ new DateTime(inputRow.getTimestampFromEpoch())
+ )
+ );
+ }
+
+ int numRows = sink.add(inputRow);
+ metrics.incrementProcessed();
+
+ if (numRows >= myRowFlushBoundary) {
+ plumber.persist(firehose.commit());
+ }
+ } else {
+ metrics.incrementThrownAway();
+ }
+ }
+ }
+ finally {
+ firehose.close();
+ }
+
+ plumber.persist(firehose.commit());
+
+ try {
+ plumber.finishJob();
+ }
+ finally {
+ log.info(
+ "Task[%s] interval[%s] partition[%d] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away)"
+ + " and output %,d rows",
+ getId(),
+ interval,
+ schema.getShardSpec().getPartitionNum(),
+ metrics.processed() + metrics.unparseable() + metrics.thrownAway(),
+ metrics.processed(),
+ metrics.unparseable(),
+ metrics.thrownAway(),
+ metrics.rowOutput()
+ );
+ }
+
+ // We expect a single segment to have been created.
+ return Iterables.getOnlyElement(pushedSegments);
+ }
+
+ /**
+ * Should we index this inputRow? Decision is based on our interval and shardSpec.
+ *
+ * @param inputRow the row to check
+ *
+ * @return true or false
+ */
+ private boolean shouldIndex(final Schema schema, final Interval interval, final InputRow inputRow)
+ {
+ return interval.contains(inputRow.getTimestampFromEpoch()) && schema.getShardSpec().isInChunk(inputRow);
}
@JsonProperty
@@ -191,7 +427,7 @@ public class IndexTask extends AbstractTask
return targetPartitionSize;
}
- @JsonProperty
+ @JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
@@ -202,4 +438,10 @@ public class IndexTask extends AbstractTask
{
return rowFlushBoundary;
}
+
+ @JsonProperty
+ public List getSpatialDimensions()
+ {
+ return spatialDimensions;
+ }
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java
index 8f4068a5e46..b4858342981 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java
@@ -28,7 +28,6 @@ import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
-import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentNukeAction;
import io.druid.timeline.DataSegment;
@@ -38,7 +37,7 @@ import java.util.List;
/**
*/
-public class KillTask extends AbstractTask
+public class KillTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(KillTask.class);
@@ -68,12 +67,12 @@ public class KillTask extends AbstractTask
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
- if(!myLock.getDataSource().equals(getDataSource())) {
+ if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
}
- if(!myLock.getInterval().equals(getImplicitLockInterval().get())) {
- throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getImplicitLockInterval().get());
+ if (!myLock.getInterval().equals(getInterval())) {
+ throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
}
// List unused segments
@@ -82,8 +81,8 @@ public class KillTask extends AbstractTask
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
- for(final DataSegment unusedSegment : unusedSegments) {
- if(unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
+ for (final DataSegment unusedSegment : unusedSegments) {
+ if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
throw new ISE(
"WTF?! Unused segment[%s] has version[%s] > task version[%s]",
unusedSegment.getIdentifier(),
@@ -98,11 +97,9 @@ public class KillTask extends AbstractTask
// Kill segments
for (DataSegment segment : unusedSegments) {
toolbox.getDataSegmentKiller().kill(segment);
+ toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment)));
}
- // Remove metadata for these segments
- toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
-
return TaskStatus.success(getId());
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 750509f9cec..40b07f72d71 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -27,7 +27,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -41,9 +41,8 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
-import io.druid.indexing.common.actions.LockAcquireAction;
-import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
+import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.IndexIO;
import io.druid.timeline.DataSegment;
@@ -53,14 +52,13 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
-public abstract class MergeTaskBase extends AbstractTask
+public abstract class MergeTaskBase extends AbstractFixedIntervalTask
{
@JsonIgnore
private final List segments;
@@ -145,7 +143,7 @@ public abstract class MergeTaskBase extends AbstractTask
);
// download segments to merge
- final Map gettedSegments = toolbox.getSegments(segments);
+ final Map gettedSegments = toolbox.fetchSegments(segments);
// merge files together
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
@@ -168,7 +166,7 @@ public abstract class MergeTaskBase extends AbstractTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
- toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
+ toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
@@ -186,9 +184,12 @@ public abstract class MergeTaskBase extends AbstractTask
* we are operating on every segment that overlaps the chosen interval.
*/
@Override
- public TaskStatus preflight(TaskActionClient taskActionClient)
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
- try {
+ // Try to acquire lock
+ if (!super.isReady(taskActionClient)) {
+ return false;
+ } else {
final Function toIdentifier = new Function()
{
@Override
@@ -199,7 +200,10 @@ public abstract class MergeTaskBase extends AbstractTask
};
final Set current = ImmutableSet.copyOf(
- Iterables.transform(taskActionClient.submit(defaultListUsedAction()), toIdentifier)
+ Iterables.transform(
+ taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval())),
+ toIdentifier
+ )
);
final Set requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));
@@ -219,10 +223,7 @@ public abstract class MergeTaskBase extends AbstractTask
);
}
- return TaskStatus.running(getId());
- }
- catch (IOException e) {
- throw Throwables.propagate(e);
+ return true;
}
}
@@ -241,7 +242,7 @@ public abstract class MergeTaskBase extends AbstractTask
return Objects.toStringHelper(this)
.add("id", getId())
.add("dataSource", getDataSource())
- .add("interval", getImplicitLockInterval())
+ .add("interval", getInterval())
.add("segments", segments)
.toString();
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java
new file mode 100644
index 00000000000..da82ffa6608
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java
@@ -0,0 +1,115 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.metamx.common.ISE;
+import com.metamx.common.logger.Logger;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentListUnusedAction;
+import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
+import io.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.List;
+import java.util.Map;
+
+public class MoveTask extends AbstractFixedIntervalTask
+{
+ private static final Logger log = new Logger(MoveTask.class);
+
+ private final Map targetLoadSpec;
+
+ @JsonCreator
+ public MoveTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("target") Map targetLoadSpec
+ )
+ {
+ super(
+ TaskUtils.makeId(id, "move", dataSource, interval),
+ dataSource,
+ interval
+ );
+ this.targetLoadSpec = targetLoadSpec;
+ }
+
+ @Override
+ public String getType()
+ {
+ return "move";
+ }
+
+ @Override
+ public TaskStatus run(TaskToolbox toolbox) throws Exception
+ {
+ // Confirm we have a lock (will throw if there isn't exactly one element)
+ final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+
+ if(!myLock.getDataSource().equals(getDataSource())) {
+ throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
+ }
+
+ if(!myLock.getInterval().equals(getInterval())) {
+ throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
+ }
+
+ // List unused segments
+ final List unusedSegments = toolbox
+ .getTaskActionClient()
+ .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
+
+ // Verify none of these segments have versions > lock version
+ for(final DataSegment unusedSegment : unusedSegments) {
+ if(unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
+ throw new ISE(
+ "WTF?! Unused segment[%s] has version[%s] > task version[%s]",
+ unusedSegment.getIdentifier(),
+ unusedSegment.getVersion(),
+ myLock.getVersion()
+ );
+ }
+
+ log.info("OK to move segment: %s", unusedSegment.getIdentifier());
+ }
+
+ // Move segments
+ for (DataSegment segment : unusedSegments) {
+ final DataSegment movedSegment = toolbox.getDataSegmentMover().move(segment, targetLoadSpec);
+ toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment)));
+ }
+
+ return TaskStatus.success(getId());
+ }
+
+ @JsonProperty
+ public Map getTargetLoadSpec()
+ {
+ return targetLoadSpec;
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
index c6291fdb4c9..b4de3512fbe 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
@@ -20,41 +20,63 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.TaskActionClient;
import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.joda.time.Period;
/**
*/
public class NoopTask extends AbstractTask
{
private static final Logger log = new Logger(NoopTask.class);
- private static int defaultRunTime = 2500;
+ private static final int defaultRunTime = 2500;
+ private static final int defaultIsReadyTime = 0;
+ private static final IsReadyResult defaultIsReadyResult = IsReadyResult.YES;
- private final int runTime;
+ enum IsReadyResult
+ {
+ YES,
+ NO,
+ EXCEPTION
+ }
+
+ @JsonIgnore
+ private final long runTime;
+
+ @JsonIgnore
+ private final long isReadyTime;
+
+ @JsonIgnore
+ private final IsReadyResult isReadyResult;
+
+ @JsonIgnore
private final FirehoseFactory firehoseFactory;
@JsonCreator
public NoopTask(
@JsonProperty("id") String id,
- @JsonProperty("interval") Interval interval,
- @JsonProperty("runTime") int runTime,
+ @JsonProperty("runTime") long runTime,
+ @JsonProperty("isReadyTime") long isReadyTime,
+ @JsonProperty("isReadyResult") String isReadyResult,
@JsonProperty("firehose") FirehoseFactory firehoseFactory
)
{
super(
id == null ? String.format("noop_%s", new DateTime()) : id,
- "none",
- interval == null ? new Interval(Period.days(1), new DateTime()) : interval
+ "none"
);
this.runTime = (runTime == 0) ? defaultRunTime : runTime;
-
+ this.isReadyTime = (isReadyTime == 0) ? defaultIsReadyTime : isReadyTime;
+ this.isReadyResult = (isReadyResult == null)
+ ? defaultIsReadyResult
+ : IsReadyResult.valueOf(isReadyResult.toUpperCase());
this.firehoseFactory = firehoseFactory;
}
@@ -64,18 +86,45 @@ public class NoopTask extends AbstractTask
return "noop";
}
- @JsonProperty("runTime")
- public int getRunTime()
+ @JsonProperty
+ public long getRunTime()
{
return runTime;
}
+ @JsonProperty
+ public long getIsReadyTime()
+ {
+ return isReadyTime;
+ }
+
+ @JsonProperty
+ public IsReadyResult getIsReadyResult()
+ {
+ return isReadyResult;
+ }
+
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception
+ {
+ switch (isReadyResult) {
+ case YES:
+ return true;
+ case NO:
+ return false;
+ case EXCEPTION:
+ throw new ISE("Not ready. Never will be ready. Go away!");
+ default:
+ throw new AssertionError("#notreached");
+ }
+ }
+
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
@@ -90,4 +139,9 @@ public class NoopTask extends AbstractTask
log.info("Woke up!");
return TaskStatus.success(getId());
}
+
+ public static NoopTask create()
+ {
+ return new NoopTask(null, 0, 0, null, null);
+ }
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index bb30c351732..01fa6e69149 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closeables;
import com.metamx.common.exception.FormattedException;
import com.metamx.emitter.EmittingLogger;
@@ -35,9 +34,8 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
-import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.LockReleaseAction;
-import io.druid.indexing.common.actions.SegmentInsertAction;
+import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@@ -130,8 +128,7 @@ public class RealtimeIndexTask extends AbstractTask
), 1
)
: taskResource,
- schema.getDataSource(),
- null
+ schema.getDataSource()
);
this.schema = schema;
@@ -167,6 +164,12 @@ public class RealtimeIndexTask extends AbstractTask
}
}
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception
+ {
+ return true;
+ }
+
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
@@ -206,7 +209,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public void announceSegment(final DataSegment segment) throws IOException
{
- // NOTE: Side effect: Calling announceSegment causes a lock to be acquired
+ // Side effect: Calling announceSegment causes a lock to be acquired
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
toolbox.getSegmentAnnouncer().announceSegment(segment);
}
@@ -225,6 +228,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public void announceSegments(Iterable segments) throws IOException
{
+ // Side effect: Calling announceSegments causes locks to be acquired
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
}
@@ -257,7 +261,7 @@ public class RealtimeIndexTask extends AbstractTask
public String getVersion(final Interval interval)
{
try {
- // NOTE: Side effect: Calling getVersion causes a lock to be acquired
+ // Side effect: Calling getVersion causes a lock to be acquired
final TaskLock myLock = toolbox.getTaskActionClient()
.submit(new LockAcquireAction(interval));
@@ -337,11 +341,11 @@ public class RealtimeIndexTask extends AbstractTask
}
}
}
- catch (Exception e) {
+ catch (Throwable e) {
+ normalExit = false;
log.makeAlert(e, "Exception aborted realtime processing[%s]", schema.getDataSource())
.emit();
- normalExit = false;
- throw Throwables.propagate(e);
+ throw e;
}
finally {
if (normalExit) {
@@ -412,7 +416,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public void publishSegment(DataSegment segment) throws IOException
{
- taskToolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
+ taskToolbox.pushSegments(ImmutableList.of(segment));
}
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index a3e42232865..8fa4b53bf10 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -21,27 +21,22 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.google.common.base.Optional;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
-import org.joda.time.Interval;
/**
* Represents a task that can run on a worker. The general contracts surrounding Tasks are:
*
- *
Tasks must operate on a single datasource.
- *
Tasks should be immutable, since the task ID is used as a proxy for the task in many locations.
- *
Task IDs must be unique. This can be done by naming them using UUIDs or the current timestamp.
- *
Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are
- * useful for producing sharded segments.
- *
Tasks can optionally have an "implicit lock interval". Tasks with this property are guaranteed to have
- * a lock on that interval during their {@link #preflight(io.druid.indexing.common.actions.TaskActionClient)}
- * and {@link #run(io.druid.indexing.common.TaskToolbox)} methods.
- *
Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose
- * to release locks early if they desire.
+ *
Tasks must operate on a single datasource.
+ *
Tasks should be immutable, since the task ID is used as a proxy for the task in many locations.
+ *
Task IDs must be unique. This can be done by naming them using UUIDs or the current timestamp.
+ *
Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are
+ * useful for producing sharded segments.
+ *
Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose
+ * to release locks early if they desire.
*
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@@ -50,9 +45,9 @@ import org.joda.time.Interval;
@JsonSubTypes.Type(name = "merge", value = MergeTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
+ @JsonSubTypes.Type(name = "move", value = MoveTask.class),
+ @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
- @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class),
- @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@@ -96,12 +91,6 @@ public interface Task
*/
public String getDataSource();
- /**
- * Returns implicit lock interval for this task, if any. Tasks without implicit lock intervals are not granted locks
- * when started and must explicitly request them.
- */
- public Optional getImplicitLockInterval();
-
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
@@ -109,18 +98,19 @@ public interface Task
public QueryRunner getQueryRunner(Query query);
/**
- * Execute preflight checks for a task. This typically runs on the coordinator, and will be run while
- * holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the
- * task should be considered a failure.
+ * Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
+ * actions must be idempotent, since this method may be executed multiple times. This typically runs on the
+ * coordinator. If this method throws an exception, the task should be considered a failure.
+ *
+ * This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
*
- * @return Some kind of status (runnable means continue on to a worker, non-runnable means we completed without
- * using a worker).
+ * @return true if ready, false if not ready yet
*
- * @throws Exception
+ * @throws Exception if the task should be considered a failure
*/
- public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception;
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception;
/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java
index 5085bdbd2e3..75561f2408e 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java
@@ -23,16 +23,15 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
-import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
-import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
-import io.druid.indexing.common.actions.SpawnTasksAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.IndexIO;
import io.druid.segment.loading.SegmentLoadingException;
@@ -48,10 +47,10 @@ import java.util.Map;
/**
*/
-public class VersionConverterTask extends AbstractTask
+public class VersionConverterTask extends AbstractFixedIntervalTask
{
private static final String TYPE = "version_converter";
- private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID);
+ private static final Integer CURR_VERSION_INTEGER = IndexIO.CURRENT_VERSION_ID;
private static final Logger log = new Logger(VersionConverterTask.class);
@@ -74,6 +73,8 @@ public class VersionConverterTask extends AbstractTask
private static String makeId(String dataSource, Interval interval)
{
+ Preconditions.checkNotNull(dataSource, "dataSource");
+ Preconditions.checkNotNull(interval, "interval");
return joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime());
}
@@ -105,7 +106,6 @@ public class VersionConverterTask extends AbstractTask
)
{
super(id, groupId, dataSource, interval);
-
this.segment = segment;
}
@@ -125,45 +125,43 @@ public class VersionConverterTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
if (segment == null) {
- throw new ISE("Segment was null, this should never run.", this.getClass().getSimpleName());
- }
-
- log.info("I'm in a subless mood.");
- convertSegment(toolbox, segment);
- return success();
- }
-
- @Override
- public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
- {
- if (segment != null) {
- return super.preflight(taskActionClient);
- }
-
- List segments = taskActionClient.submit(defaultListUsedAction());
-
- final FunctionalIterable tasks = FunctionalIterable
- .create(segments)
- .keep(
- new Function()
- {
- @Override
- public Task apply(DataSegment segment)
+ final List segments = toolbox.getTaskActionClient().submit(
+ new SegmentListUsedAction(
+ getDataSource(),
+ getInterval()
+ )
+ );
+ final FunctionalIterable tasks = FunctionalIterable
+ .create(segments)
+ .keep(
+ new Function()
{
- final Integer segmentVersion = segment.getBinaryVersion();
- if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
- return new SubTask(getGroupId(), segment);
+ @Override
+ public Task apply(DataSegment segment)
+ {
+ final Integer segmentVersion = segment.getBinaryVersion();
+ if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
+ return new SubTask(getGroupId(), segment);
+ }
+
+ log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
+ return null;
}
-
- log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
- return null;
}
- }
- );
+ );
- taskActionClient.submit(new SpawnTasksAction(Lists.newArrayList(tasks)));
-
- return TaskStatus.success(getId());
+ // Vestigial from a past time when this task spawned subtasks.
+ for (final Task subTask : tasks) {
+ final TaskStatus status = subTask.run(toolbox);
+ if (!status.isSuccess()) {
+ return status;
+ }
+ }
+ } else {
+ log.info("I'm in a subless mood.");
+ convertSegment(toolbox, segment);
+ }
+ return success();
}
@Override
@@ -185,7 +183,7 @@ public class VersionConverterTask extends AbstractTask
return super.equals(o);
}
- public static class SubTask extends AbstractTask
+ public static class SubTask extends AbstractFixedIntervalTask
{
@JsonIgnore
private final DataSegment segment;
@@ -251,7 +249,7 @@ public class VersionConverterTask extends AbstractTask
}
}
- final Map localSegments = toolbox.getSegments(Arrays.asList(segment));
+ final Map localSegments = toolbox.fetchSegments(Arrays.asList(segment));
final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out");
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java
new file mode 100644
index 00000000000..e1649b46f32
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java
@@ -0,0 +1,87 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.common.tasklogs;
+
+import com.google.common.base.Optional;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.inject.Inject;
+import com.metamx.common.logger.Logger;
+import io.druid.indexing.common.config.FileTaskLogsConfig;
+import io.druid.tasklogs.TaskLogs;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class FileTaskLogs implements TaskLogs
+{
+ private static final Logger log = new Logger(FileTaskLogs.class);
+
+ private final FileTaskLogsConfig config;
+
+ @Inject
+ public FileTaskLogs(
+ FileTaskLogsConfig config
+ )
+ {
+ this.config = config;
+ }
+
+ @Override
+ public void pushTaskLog(final String taskid, File file) throws IOException
+ {
+ if (!config.getDirectory().exists()) {
+ config.getDirectory().mkdir();
+ }
+ final File outputFile = fileForTask(taskid);
+ Files.copy(file, outputFile);
+ log.info("Wrote task log to: %s", outputFile);
+ }
+
+ @Override
+ public Optional> streamTaskLog(final String taskid, final long offset) throws IOException
+ {
+ final File file = fileForTask(taskid);
+ if (file.exists()) {
+ return Optional.>of(
+ new InputSupplier()
+ {
+ @Override
+ public InputStream getInput() throws IOException
+ {
+ final InputStream inputStream = new FileInputStream(file);
+ ByteStreams.skipFully(inputStream, offset);
+ return inputStream;
+ }
+ }
+ );
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ private File fileForTask(final String taskid)
+ {
+ return new File(config.getDirectory(), String.format("%s.log", taskid));
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java
index ec2e2f99feb..cf0fb4f3e24 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java
@@ -23,29 +23,41 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
+import com.metamx.common.RetryUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
+import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
+import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
+import org.joda.time.Period;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.StatementException;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
+import java.sql.SQLException;
+import java.sql.SQLRecoverableException;
+import java.sql.SQLTransientException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
public class DbTaskStorage implements TaskStorage
{
@@ -53,16 +65,24 @@ public class DbTaskStorage implements TaskStorage
private final DbConnector dbConnector;
private final DbTablesConfig dbTables;
private final IDBI dbi;
+ private final TaskStorageConfig config;
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
@Inject
- public DbTaskStorage(ObjectMapper jsonMapper, DbConnector dbConnector, DbTablesConfig dbTables, IDBI dbi)
+ public DbTaskStorage(
+ final ObjectMapper jsonMapper,
+ final DbConnector dbConnector,
+ final DbTablesConfig dbTables,
+ final IDBI dbi,
+ final TaskStorageConfig config
+ )
{
this.jsonMapper = jsonMapper;
this.dbConnector = dbConnector;
this.dbTables = dbTables;
this.dbi = dbi;
+ this.config = config;
}
@LifecycleStart
@@ -92,7 +112,7 @@ public class DbTaskStorage implements TaskStorage
log.info("Inserting task %s with status: %s", task.getId(), status);
try {
- dbi.withHandle(
+ retryingHandle(
new HandleCallback()
{
@Override
@@ -134,7 +154,7 @@ public class DbTaskStorage implements TaskStorage
log.info("Updating task %s to status: %s", status.getId(), status);
- int updated = dbi.withHandle(
+ int updated = retryingHandle(
new HandleCallback()
{
@Override
@@ -162,7 +182,7 @@ public class DbTaskStorage implements TaskStorage
@Override
public Optional getTask(final String taskid)
{
- return dbi.withHandle(
+ return retryingHandle(
new HandleCallback>()
{
@Override
@@ -192,7 +212,7 @@ public class DbTaskStorage implements TaskStorage
@Override
public Optional getStatus(final String taskid)
{
- return dbi.withHandle(
+ return retryingHandle(
new HandleCallback>()
{
@Override
@@ -220,9 +240,9 @@ public class DbTaskStorage implements TaskStorage
}
@Override
- public List getRunningTasks()
+ public List getActiveTasks()
{
- return dbi.withHandle(
+ return retryingHandle(
new HandleCallback>()
{
@Override
@@ -231,7 +251,7 @@ public class DbTaskStorage implements TaskStorage
final List