diff --git a/build.sh b/build.sh index 71b98b275d7..9b7478e592e 100755 --- a/build.sh +++ b/build.sh @@ -30,4 +30,4 @@ echo "For examples, see: " echo " " ls -1 examples/*/*sh echo " " -echo "See also http://druid.io/docs/0.6.33" +echo "See also http://druid.io/docs/0.6.40" diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml index 47756cfa602..00781634ef0 100644 --- a/cassandra-storage/pom.xml +++ b/cassandra-storage/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index 2e497083c06..3be3dc64d17 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/docs/content/Booting-a-production-cluster.md b/docs/content/Booting-a-production-cluster.md index be9a0b77bc7..bb00506700f 100644 --- a/docs/content/Booting-a-production-cluster.md +++ b/docs/content/Booting-a-production-cluster.md @@ -3,7 +3,7 @@ layout: doc_page --- # Booting a Single Node Cluster # -[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.33-bin.tar.gz). +[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.40-bin.tar.gz). The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables: diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md index d7e010b3006..f01b5d41a0b 100644 --- a/docs/content/Configuration.md +++ b/docs/content/Configuration.md @@ -285,8 +285,10 @@ This deep storage is used to interface with Amazon's S3. |Property|Description|Default| |--------|-----------|-------| |`druid.storage.bucket`|S3 bucket name.|none| -|`druid.storage.basekey`|S3 base key.|none| +|`druid.storage.baseKey`|S3 object key prefix for storage.|none| |`druid.storage.disableAcl`|Boolean flag for ACL.|false| +|`druid.storage.archiveBucket`|S3 bucket name for archiving when running the indexing-service *archive task*.|none| +|`druid.storage.archiveBaseKey`|S3 object key prefix for archiving.|none| #### HDFS Deep Storage diff --git a/docs/content/Examples.md b/docs/content/Examples.md index 1c6cf675b01..7aff89ea644 100644 --- a/docs/content/Examples.md +++ b/docs/content/Examples.md @@ -19,13 +19,13 @@ Clone Druid and build it: git clone https://github.com/metamx/druid.git druid cd druid git fetch --tags -git checkout druid-0.6.33 +git checkout druid-0.6.40 ./build.sh ``` ### Downloading the DSK (Druid Standalone Kit) -[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.33-bin.tar.gz) a stand-alone tarball and run it: +[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz) a stand-alone tarball and run it: ``` bash tar -xzf druid-services-0.X.X-bin.tar.gz diff --git a/docs/content/Indexing-Service.md b/docs/content/Indexing-Service.md index 147b4d16467..2f15b200025 100644 --- a/docs/content/Indexing-Service.md +++ b/docs/content/Indexing-Service.md @@ -117,6 +117,7 @@ In addition to the configuration of some of the default modules in [Configuratio |--------|-----------|-------| |`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment.|local| |`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be resumed if the overlord should fail.|local| +|`druid.indexer.storage.recentlyFinishedThreshold`|A duration of time to store task results.|PT24H| |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE| |`druid.indexer.queue.startDelay`|Sleep this long before starting overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M| |`druid.indexer.queue.restartDelay`|Sleep this long when overlord queue management throws an exception before trying again.|PT30S| diff --git a/docs/content/Realtime.md b/docs/content/Realtime.md index adedc001fd0..980e1dfb773 100644 --- a/docs/content/Realtime.md +++ b/docs/content/Realtime.md @@ -27,7 +27,7 @@ druid.host=localhost druid.service=realtime druid.port=8083 -druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.33"] +druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.40"] druid.zk.service.host=localhost diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md index 2de698d100d..1b35c0979e2 100644 --- a/docs/content/Tutorial:-A-First-Look-at-Druid.md +++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md @@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu ### Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.33-bin.tar.gz). Download this file to a directory of your choosing. +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz). Download this file to a directory of your choosing. You can extract the awesomeness within by issuing: @@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz Not too lost so far right? That's great! If you cd into the directory: ``` -cd druid-services-0.6.33 +cd druid-services-0.6.40 ``` You should see a bunch of files: @@ -205,7 +205,7 @@ You are probably wondering, what are these [Granularities](Granularities.html) a To issue the query and get some results, run the following in your command line: ``` -curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d ````timeseries_query.body +curl -X POST 'http://localhost:8083/druid/v2/?pretty' -H 'content-type: application/json' -d @timeseries_query.body ``` Once again, you should get a JSON blob of text back with your results, that looks something like this: diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md index 2c56c81a839..1f0992bb6bd 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md @@ -94,6 +94,7 @@ druid.db.connector.user=druid druid.db.connector.password=diurd druid.selectors.indexing.serviceName=overlord +druid.indexer.queue.startDelay=PT0M druid.indexer.runner.javaOpts="-server -Xmx1g" druid.indexer.runner.startPort=8088 druid.indexer.fork.property.druid.computation.buffer.size=268435456 @@ -246,17 +247,19 @@ Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) should yield: } ] ``` -Problems? ---------- +Console +-------- -If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can read the individual task logs at: +The indexing service overlord has a console located at: ```bash -/log/.log - +localhost:8087/console.html ``` -One thing to note is that the log file will only exist once the task completes with either SUCCESS or FAILURE. +On this console, you can look at statuses and logs of recently submitted and completed tasks. + +If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can use the console to read the individual task logs. + Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html). More information about how to configure this is [here](Configuration.html). Most common data ingestion problems are around timestamp formats and other malformed data issues. diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md index f2046e7847a..50fef2985f6 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md @@ -44,7 +44,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h #### Setting up Kafka -[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.33/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node. +[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.40/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node. Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html). diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md index 0bc2d936716..348dfd1df53 100644 --- a/docs/content/Tutorial:-The-Druid-Cluster.md +++ b/docs/content/Tutorial:-The-Druid-Cluster.md @@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. -You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.33-bin.tar.gz) +You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz) and untar the contents within by issuing: @@ -149,7 +149,7 @@ druid.port=8081 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.33"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.40"] # Dummy read only AWS account (used to download example data) druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b @@ -238,7 +238,7 @@ druid.port=8083 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.33","io.druid.extensions:druid-kafka-seven:0.6.33"] +druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.40","io.druid.extensions:druid-kafka-seven:0.6.40"] # Change this config to db to hand off to the rest of the Druid cluster druid.publish.type=noop diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md index 02c622d3ef8..a9605f2fea4 100644 --- a/docs/content/Tutorial:-Webstream.md +++ b/docs/content/Tutorial:-Webstream.md @@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu h3. Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.33-bin.tar.gz) +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz) Download this file to a directory of your choosing. You can extract the awesomeness within by issuing: @@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz Not too lost so far right? That's great! If you cd into the directory: ``` -cd druid-services-0.6.33 +cd druid-services-0.6.40 ``` You should see a bunch of files: diff --git a/docs/content/Twitter-Tutorial.textile b/docs/content/Twitter-Tutorial.textile index c86423a8066..c82ea981ec4 100644 --- a/docs/content/Twitter-Tutorial.textile +++ b/docs/content/Twitter-Tutorial.textile @@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source. h3. Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.33-bin.tar.gz. +We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz. Download this bad boy to a directory of your choosing. You can extract the awesomeness within by issuing: diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties index 4a1a78f984a..fa2593c63a1 100644 --- a/examples/config/historical/runtime.properties +++ b/examples/config/historical/runtime.properties @@ -4,7 +4,7 @@ druid.port=8081 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.33"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.40"] # Dummy read only AWS account (used to download example data) druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties index fb79a8fc56a..eae65e057f9 100644 --- a/examples/config/realtime/runtime.properties +++ b/examples/config/realtime/runtime.properties @@ -4,7 +4,7 @@ druid.port=8083 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.33","io.druid.extensions:druid-kafka-seven:0.6.33","io.druid.extensions:druid-rabbitmq:0.6.33"] +druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.40","io.druid.extensions:druid-kafka-seven:0.6.40","io.druid.extensions:druid-rabbitmq:0.6.40"] # Change this config to db to hand off to the rest of the Druid cluster druid.publish.type=noop diff --git a/examples/pom.xml b/examples/pom.xml index e221d145cf8..ae847d7d6bf 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml index 1c646fa9f30..fb486c69856 100644 --- a/hdfs-storage/pom.xml +++ b/hdfs-storage/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index c9cf5a0302f..a778774dd60 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index ea3949c7b71..70eecc8ab04 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 0a7a505d4ec..eb34f4e4c1b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -29,7 +29,9 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; @@ -52,6 +54,8 @@ public class TaskToolbox private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; + private final DataSegmentArchiver dataSegmentArchiver; + private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; @@ -68,6 +72,8 @@ public class TaskToolbox ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, + DataSegmentMover dataSegmentMover, + DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @@ -84,6 +90,8 @@ public class TaskToolbox this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; + this.dataSegmentMover = dataSegmentMover; + this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; @@ -119,6 +127,16 @@ public class TaskToolbox return dataSegmentKiller; } + public DataSegmentMover getDataSegmentMover() + { + return dataSegmentMover; + } + + public DataSegmentArchiver getDataSegmentArchiver() + { + return dataSegmentArchiver; + } + public DataSegmentAnnouncer getSegmentAnnouncer() { return segmentAnnouncer; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index ca00dccaf91..d655edc34f0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -24,13 +24,14 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.ServerView; -import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Processing; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.coordination.DataSegmentAnnouncer; @@ -47,6 +48,8 @@ public class TaskToolboxFactory private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; + private final DataSegmentMover dataSegmentMover; + private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; @@ -62,6 +65,8 @@ public class TaskToolboxFactory ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, + DataSegmentMover dataSegmentMover, + DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @@ -76,6 +81,8 @@ public class TaskToolboxFactory this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; + this.dataSegmentMover = dataSegmentMover; + this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; @@ -96,6 +103,8 @@ public class TaskToolboxFactory emitter, segmentPusher, dataSegmentKiller, + dataSegmentMover, + dataSegmentArchiver, segmentAnnouncer, newSegmentServerView, queryRunnerFactoryConglomerate, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java new file mode 100644 index 00000000000..f996a2c6ab0 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java @@ -0,0 +1,77 @@ +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import com.metamx.common.ISE; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.indexing.common.task.Task; +import io.druid.timeline.DataSegment; + +import java.io.IOException; +import java.util.Set; + +public class SegmentMetadataUpdateAction implements TaskAction +{ + @JsonIgnore + private final Set segments; + + @JsonCreator + public SegmentMetadataUpdateAction( + @JsonProperty("segments") Set segments + ) + { + this.segments = ImmutableSet.copyOf(segments); + } + + @JsonProperty + public Set getSegments() + { + return segments; + } + + public TypeReference getReturnTypeReference() + { + return new TypeReference() {}; + } + + @Override + public Void perform( + Task task, TaskActionToolbox toolbox + ) throws IOException + { + if(!toolbox.taskLockCoversSegments(task, segments, true)) { + throw new ISE("Segments not covered by locks for task: %s", task.getId()); + } + + toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments); + + // Emit metrics + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() + .setUser2(task.getDataSource()) + .setUser4(task.getType()); + + for (DataSegment segment : segments) { + metricBuilder.setUser5(segment.getInterval().toString()); + toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentMoved/bytes", segment.getSize())); + } + + return null; + } + + @Override + public boolean isAudited() + { + return true; + } + + @Override + public String toString() + { + return "SegmentMetadataUpdateAction{" + + "segments=" + segments + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java index d9bdfe5b694..038d06be3c6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java @@ -35,7 +35,8 @@ import java.io.IOException; @JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class), @JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class), @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class), - @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class) + @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), + @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class) }) public interface TaskAction { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java new file mode 100644 index 00000000000..db40c7cb069 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java @@ -0,0 +1,19 @@ +package io.druid.indexing.common.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.validation.constraints.NotNull; + +public class TaskStorageConfig +{ + @JsonProperty + @NotNull + public Duration recentlyFinishedThreshold = new Period("PT24H").toStandardDuration(); + + public Duration getRecentlyFinishedThreshold() + { + return recentlyFinishedThreshold; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java new file mode 100644 index 00000000000..c863742e0ab --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java @@ -0,0 +1,110 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUnusedAction; +import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; + +public class ArchiveTask extends AbstractFixedIntervalTask +{ + private static final Logger log = new Logger(ArchiveTask.class); + + public ArchiveTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + super( + TaskUtils.makeId(id, "archive", dataSource, interval), + dataSource, + interval + ); + } + + @Override + public String getType() + { + return "archive"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + // Confirm we have a lock (will throw if there isn't exactly one element) + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + + if (!myLock.getDataSource().equals(getDataSource())) { + throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); + } + + if (!myLock.getInterval().equals(getInterval())) { + throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval()); + } + + // List unused segments + final List unusedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); + + // Verify none of these segments have versions > lock version + for (final DataSegment unusedSegment : unusedSegments) { + if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) { + throw new ISE( + "WTF?! Unused segment[%s] has version[%s] > task version[%s]", + unusedSegment.getIdentifier(), + unusedSegment.getVersion(), + myLock.getVersion() + ); + } + + log.info("OK to archive segment: %s", unusedSegment.getIdentifier()); + } + + List archivedSegments = Lists.newLinkedList(); + + // Move segments + for (DataSegment segment : unusedSegments) { + archivedSegments.add(toolbox.getDataSegmentArchiver().archive(segment)); + } + + // Update metadata for moved segments + toolbox.getTaskActionClient().submit( + new SegmentMetadataUpdateAction( + ImmutableSet.copyOf(archivedSegments) + ) + ); + + return TaskStatus.success(getId()); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java new file mode 100644 index 00000000000..8e628b93188 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java @@ -0,0 +1,121 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUnusedAction; +import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Map; + +public class MoveTask extends AbstractFixedIntervalTask +{ + private static final Logger log = new Logger(MoveTask.class); + + private final Map targetLoadSpec; + + @JsonCreator + public MoveTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("target") Map targetLoadSpec + ) + { + super( + TaskUtils.makeId(id, "move", dataSource, interval), + dataSource, + interval + ); + this.targetLoadSpec = targetLoadSpec; + } + + @Override + public String getType() + { + return "move"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + // Confirm we have a lock (will throw if there isn't exactly one element) + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + + if(!myLock.getDataSource().equals(getDataSource())) { + throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); + } + + if(!myLock.getInterval().equals(getInterval())) { + throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval()); + } + + // List unused segments + final List unusedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); + + // Verify none of these segments have versions > lock version + for(final DataSegment unusedSegment : unusedSegments) { + if(unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) { + throw new ISE( + "WTF?! Unused segment[%s] has version[%s] > task version[%s]", + unusedSegment.getIdentifier(), + unusedSegment.getVersion(), + myLock.getVersion() + ); + } + + log.info("OK to move segment: %s", unusedSegment.getIdentifier()); + } + + List movedSegments = Lists.newLinkedList(); + + // Move segments + for (DataSegment segment : unusedSegments) { + movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec)); + } + + // Update metadata for moved segments + toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction( + ImmutableSet.copyOf(movedSegments) + )); + + return TaskStatus.success(getId()); + } + + @JsonProperty + public Map getTargetLoadSpec() + { + return targetLoadSpec; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 001e1520eaa..d45f66377b7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -20,7 +20,9 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; @@ -34,14 +36,34 @@ public class NoopTask extends AbstractTask { private static final Logger log = new Logger(NoopTask.class); private static int defaultRunTime = 2500; + private static int defaultIsReadyTime = 0; + private static IsReadyResult defaultIsReadyResult = IsReadyResult.YES; - private final int runTime; + enum IsReadyResult + { + YES, + NO, + EXCEPTION + } + + @JsonIgnore + private final long runTime; + + @JsonIgnore + private final long isReadyTime; + + @JsonIgnore + private final IsReadyResult isReadyResult; + + @JsonIgnore private final FirehoseFactory firehoseFactory; @JsonCreator public NoopTask( @JsonProperty("id") String id, - @JsonProperty("runTime") int runTime, + @JsonProperty("runTime") long runTime, + @JsonProperty("isReadyTime") long isReadyTime, + @JsonProperty("isReadyResult") String isReadyResult, @JsonProperty("firehose") FirehoseFactory firehoseFactory ) { @@ -51,6 +73,10 @@ public class NoopTask extends AbstractTask ); this.runTime = (runTime == 0) ? defaultRunTime : runTime; + this.isReadyTime = (isReadyTime == 0) ? defaultIsReadyTime : isReadyTime; + this.isReadyResult = (isReadyResult == null) + ? defaultIsReadyResult + : IsReadyResult.valueOf(isReadyResult.toUpperCase()); this.firehoseFactory = firehoseFactory; } @@ -60,12 +86,24 @@ public class NoopTask extends AbstractTask return "noop"; } - @JsonProperty("runTime") - public int getRunTime() + @JsonProperty + public long getRunTime() { return runTime; } + @JsonProperty + public long getIsReadyTime() + { + return isReadyTime; + } + + @JsonProperty + public IsReadyResult getIsReadyResult() + { + return isReadyResult; + } + @JsonProperty("firehose") public FirehoseFactory getFirehoseFactory() { @@ -75,7 +113,16 @@ public class NoopTask extends AbstractTask @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - return true; + switch (isReadyResult) { + case YES: + return true; + case NO: + return false; + case EXCEPTION: + throw new ISE("Not ready. Never will be ready. Go away!"); + default: + throw new AssertionError("#notreached"); + } } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 4d6afd2ebf6..8fa4b53bf10 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -45,6 +45,8 @@ import io.druid.query.QueryRunner; @JsonSubTypes.Type(name = "merge", value = MergeTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class), @JsonSubTypes.Type(name = "kill", value = KillTask.class), + @JsonSubTypes.Type(name = "move", value = MoveTask.class), + @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index 7d3ad05512e..5dc6e1c6fff 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -34,15 +34,16 @@ import com.metamx.common.RetryUtils; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; -import com.mysql.jdbc.exceptions.MySQLTimeoutException; import com.mysql.jdbc.exceptions.MySQLTransientException; import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; +import org.joda.time.Period; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.exceptions.CallbackFailedException; @@ -64,16 +65,24 @@ public class DbTaskStorage implements TaskStorage private final DbConnector dbConnector; private final DbTablesConfig dbTables; private final IDBI dbi; + private final TaskStorageConfig config; private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); @Inject - public DbTaskStorage(ObjectMapper jsonMapper, DbConnector dbConnector, DbTablesConfig dbTables, IDBI dbi) + public DbTaskStorage( + final ObjectMapper jsonMapper, + final DbConnector dbConnector, + final DbTablesConfig dbTables, + final IDBI dbi, + final TaskStorageConfig config + ) { this.jsonMapper = jsonMapper; this.dbConnector = dbConnector; this.dbTables = dbTables; this.dbi = dbi; + this.config = config; } @LifecycleStart @@ -271,6 +280,45 @@ public class DbTaskStorage implements TaskStorage ); } + @Override + public List getRecentlyFinishedTaskStatuses() + { + final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold()); + return retryingHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + final List> dbTasks = + handle.createQuery( + String.format( + "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC", + dbTables.getTasksTable() + ) + ).bind("recent", recent.toString()).list(); + + final ImmutableList.Builder statuses = ImmutableList.builder(); + for (final Map row : dbTasks) { + final String id = row.get("id").toString(); + + try { + final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class); + if (status.isComplete()) { + statuses.add(status); + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit(); + } + } + + return statuses.build(); + } + } + ); + } + @Override public void addLock(final String taskid, final TaskLock taskLock) { @@ -407,7 +455,8 @@ public class DbTaskStorage implements TaskStorage for (final Map dbTaskLog : dbTaskLogs) { try { retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class)); - } catch (Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to deserialize TaskLog") .addData("task", taskid) .addData("logPayload", dbTaskLog) @@ -451,7 +500,8 @@ public class DbTaskStorage implements TaskStorage /** * Retry SQL operations */ - private T retryingHandle(final HandleCallback callback) { + private T retryingHandle(final HandleCallback callback) + { final Callable call = new Callable() { @Override @@ -471,9 +521,11 @@ public class DbTaskStorage implements TaskStorage final int maxTries = 10; try { return RetryUtils.retry(call, shouldRetry, maxTries); - } catch (RuntimeException e) { + } + catch (RuntimeException e) { throw Throwables.propagate(e); - } catch (Exception e) { + } + catch (Exception e) { throw new CallbackFailedException(e); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index fce401c6641..1099bddbcc6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -391,7 +391,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer if (offset > 0) { raf.seek(offset); } else if (offset < 0 && offset < rafLength) { - raf.seek(rafLength + offset); + raf.seek(Math.max(0, rafLength + offset)); } return Channels.newInputStream(raf.getChannel()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index ef23972ebe4..ef942e5c12f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -19,6 +19,7 @@ package io.druid.indexing.overlord; +import com.google.api.client.util.Lists; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -26,11 +27,15 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; +import org.joda.time.DateTime; import java.util.List; import java.util.Map; @@ -42,6 +47,8 @@ import java.util.concurrent.locks.ReentrantLock; */ public class HeapMemoryTaskStorage implements TaskStorage { + private final TaskStorageConfig config; + private final ReentrantLock giant = new ReentrantLock(); private final Map tasks = Maps.newHashMap(); private final Multimap taskLocks = HashMultimap.create(); @@ -49,6 +56,12 @@ public class HeapMemoryTaskStorage implements TaskStorage private static final Logger log = new Logger(HeapMemoryTaskStorage.class); + @Inject + public HeapMemoryTaskStorage(TaskStorageConfig config) + { + this.config = config; + } + @Override public void insert(Task task, TaskStatus status) { @@ -69,7 +82,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } log.info("Inserting task %s with status: %s", task.getId(), status); - tasks.put(task.getId(), new TaskStuff(task, status)); + tasks.put(task.getId(), new TaskStuff(task, status, new DateTime())); } finally { giant.unlock(); } @@ -139,13 +152,39 @@ public class HeapMemoryTaskStorage implements TaskStorage listBuilder.add(taskStuff.getTask()); } } - return listBuilder.build(); } finally { giant.unlock(); } } + @Override + public List getRecentlyFinishedTaskStatuses() + { + giant.lock(); + + try { + final List returns = Lists.newArrayList(); + final long recent = System.currentTimeMillis() - config.getRecentlyFinishedThreshold().getMillis(); + final Ordering createdDateDesc = new Ordering() + { + @Override + public int compare(TaskStuff a, TaskStuff b) + { + return a.getCreatedDate().compareTo(b.getCreatedDate()); + } + }.reverse(); + for(final TaskStuff taskStuff : createdDateDesc.sortedCopy(tasks.values())) { + if(taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().getMillis() > recent) { + returns.add(taskStuff.getStatus()); + } + } + return returns; + } finally { + giant.unlock(); + } + } + @Override public void addLock(final String taskid, final TaskLock taskLock) { @@ -212,8 +251,9 @@ public class HeapMemoryTaskStorage implements TaskStorage { final Task task; final TaskStatus status; + final DateTime createdDate; - private TaskStuff(Task task, TaskStatus status) + private TaskStuff(Task task, TaskStatus status, DateTime createdDate) { Preconditions.checkNotNull(task); Preconditions.checkNotNull(status); @@ -221,6 +261,7 @@ public class HeapMemoryTaskStorage implements TaskStorage this.task = task; this.status = status; + this.createdDate = Preconditions.checkNotNull(createdDate, "createdDate"); } public Task getTask() @@ -233,9 +274,14 @@ public class HeapMemoryTaskStorage implements TaskStorage return status; } + public DateTime getCreatedDate() + { + return createdDate; + } + private TaskStuff withStatus(TaskStatus _status) { - return new TaskStuff(task, _status); + return new TaskStuff(task, _status, createdDate); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java index 2a4b5e8912d..f9db89b3fc9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java @@ -28,8 +28,6 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.logger.Logger; -import io.druid.db.DbConnector; -import io.druid.db.DbConnectorConfig; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; @@ -213,6 +211,24 @@ public class IndexerDBCoordinator return true; } + public void updateSegmentMetadata(final Set segments) throws IOException + { + dbi.inTransaction( + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + for(final DataSegment segment : segments) { + updatePayload(handle, segment); + } + + return null; + } + } + ); + } + public void deleteSegments(final Set segments) throws IOException { dbi.inTransaction( @@ -235,10 +251,27 @@ public class IndexerDBCoordinator { handle.createStatement( String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) - ).bind("id", segment.getIdentifier()) + ) + .bind("id", segment.getIdentifier()) .execute(); } + private void updatePayload(final Handle handle, final DataSegment segment) throws IOException + { + try { + handle.createStatement( + String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) + ) + .bind("id", segment.getIdentifier()) + .bind("payload", jsonMapper.writeValueAsString(segment)) + .execute(); + } + catch (IOException e) { + log.error(e, "Exception inserting into DB"); + throw e; + } + } + public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) { List matchingSegments = dbi.withHandle( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 77e4b26e372..693a504542c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -24,6 +24,7 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -224,7 +225,8 @@ public class TaskQueue runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } // Attain futures for all active tasks (assuming they are ready to run). - for (final Task task : tasks) { + // Copy tasks list, as notifyStatus may modify it. + for (final Task task : ImmutableList.copyOf(tasks)) { if (!taskFutures.containsKey(task.getId())) { final ListenableFuture runnerTaskFuture; if (runnerTaskFutures.containsKey(task.getId())) { @@ -236,7 +238,7 @@ public class TaskQueue taskIsReady = task.isReady(taskActionClientFactory.create(task)); } catch (Exception e) { - log.makeAlert(e, "Exception thrown during isReady").addData("task", task.getId()).emit(); + log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); notifyStatus(task, TaskStatus.failure(task.getId())); continue; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java index a78faa24d03..2963c875257 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -19,6 +19,8 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ComparisonChain; import com.google.common.util.concurrent.ListenableFuture; import io.druid.indexing.common.TaskStatus; @@ -56,21 +58,25 @@ public class TaskRunnerWorkItem implements Comparable this.queueInsertionTime = queueInsertionTime; } + @JsonProperty public String getTaskId() { return taskId; } + @JsonIgnore public ListenableFuture getResult() { return result; } + @JsonProperty public DateTime getCreatedTime() { return createdTime; } + @JsonProperty public DateTime getQueueInsertionTime() { return queueInsertionTime; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java index 3a2145627df..fb289459256 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java @@ -82,6 +82,13 @@ public interface TaskStorage */ public List getActiveTasks(); + /** + * Returns a list of recently finished task statuses as stored in the storage facility. No particular order + * is guaranteed, but implementations are encouraged to return tasks in descending order of creation. No particular + * standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing. + */ + public List getRecentlyFinishedTaskStatuses(); + /** * Returns a list of locks for a particular task. */ diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java index e9a2a8d5d7c..67ea11dcf33 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -19,14 +19,19 @@ package io.druid.indexing.overlord; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; +import javax.annotation.Nullable; +import java.util.List; import java.util.Set; /** @@ -42,6 +47,21 @@ public class TaskStorageQueryAdapter this.storage = storage; } + public List getActiveTasks() + { + return storage.getActiveTasks(); + } + + public List getRecentlyFinishedTaskStatuses() + { + return storage.getRecentlyFinishedTaskStatuses(); + } + + public Optional getTask(final String taskid) + { + return storage.getTask(taskid); + } + public Optional getStatus(final String taskid) { return storage.getStatus(taskid); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index d3d58bef03d..f161cb3c278 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -19,15 +19,20 @@ package io.druid.indexing.overlord.http; +import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.InputSupplier; +import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.common.config.JacksonConfigManager; +import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; @@ -40,6 +45,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -52,6 +58,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -63,20 +71,6 @@ public class OverlordResource { private static final Logger log = new Logger(OverlordResource.class); - private static Function> simplifyTaskFn = - new Function>() - { - @Override - public Map apply(TaskRunnerWorkItem input) - { - return new ImmutableMap.Builder() - .put("id", input.getTaskId()) - .put("createdTime", input.getCreatedTime()) - .put("queueInsertionTime", input.getQueueInsertionTime()) - .build(); - } - }; - private final TaskMaster taskMaster; private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskLogStreamer taskLogStreamer; @@ -139,6 +133,14 @@ public class OverlordResource ); } + @GET + @Path("/task/{taskid}") + @Produces("application/json") + public Response getTaskPayload(@PathParam("taskid") String taskid) + { + return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid)); + } + @GET @Path("/task/{taskid}/status") @Produces("application/json") @@ -238,39 +240,64 @@ public class OverlordResource } @GET - @Path("/pendingTasks") + @Path("/waitingTasks") @Produces("application/json") - public Response getPendingTasks( - @QueryParam("full") String full - ) + public Response getWaitingTasks() { - if (full != null) { - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - return Response.ok(taskRunner.getPendingTasks()).build(); - } - } - ); - } - - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() + return workItemsResponse( + new Function>() { @Override - public Response apply(TaskRunner taskRunner) + public Collection apply(TaskRunner taskRunner) { - return Response.ok( - Collections2.transform( - taskRunner.getPendingTasks(), - simplifyTaskFn + // A bit roundabout, but works as a way of figuring out what tasks haven't been handed + // off to the runner yet: + final List activeTasks = taskStorageQueryAdapter.getActiveTasks(); + final Set runnersKnownTasks = Sets.newHashSet( + Iterables.transform( + taskRunner.getKnownTasks(), + new Function() + { + @Override + public String apply(final TaskRunnerWorkItem workItem) + { + return workItem.getTaskId(); + } + } ) - ).build(); + ); + final List waitingTasks = Lists.newArrayList(); + for (final Task task : activeTasks) { + if (!runnersKnownTasks.contains(task.getId())) { + waitingTasks.add( + // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. + new TaskRunnerWorkItem( + task.getId(), + SettableFuture.create(), + new DateTime(0), + new DateTime(0) + ) + ); + } + } + return waitingTasks; + } + } + ); + } + + @GET + @Path("/pendingTasks") + @Produces("application/json") + public Response getPendingTasks() + { + return workItemsResponse( + new Function>() + { + @Override + public Collection apply(TaskRunner taskRunner) + { + return taskRunner.getPendingTasks(); } } ); @@ -279,42 +306,45 @@ public class OverlordResource @GET @Path("/runningTasks") @Produces("application/json") - public Response getRunningTasks( - @QueryParam("full") String full - ) + public Response getRunningTasks() { - if (full != null) { - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - return Response.ok(taskRunner.getRunningTasks()).build(); - } - } - ); - } - - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() + return workItemsResponse( + new Function>() { @Override - public Response apply(TaskRunner taskRunner) + public Collection apply(TaskRunner taskRunner) { - return Response.ok( - Collections2.transform( - taskRunner.getRunningTasks(), - simplifyTaskFn - ) - ).build(); + return taskRunner.getRunningTasks(); } } ); } + @GET + @Path("/completeTasks") + @Produces("application/json") + public Response getCompleteTasks() + { + final List completeTasks = Lists.transform( + taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(), + new Function() + { + @Override + public TaskResponseObject apply(TaskStatus taskStatus) + { + // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. + return new TaskResponseObject( + taskStatus.getId(), + new DateTime(0), + new DateTime(0), + Optional.of(taskStatus) + ); + } + } + ); + return Response.ok(completeTasks).build(); + } + @GET @Path("/workers") @Produces("application/json") @@ -338,17 +368,13 @@ public class OverlordResource @Produces("application/json") public Response getScalingState() { - return asLeaderWith( - taskMaster.getResourceManagementScheduler(), - new Function() - { - @Override - public Response apply(ResourceManagementScheduler resourceManagementScheduler) - { - return Response.ok(resourceManagementScheduler.getStats()).build(); - } - } - ); + // Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler. + final Optional rms = taskMaster.getResourceManagementScheduler(); + if (rms.isPresent()) { + return Response.ok(rms.get().getStats()).build(); + } else { + return Response.ok().build(); + } } @GET @@ -373,7 +399,39 @@ public class OverlordResource } } - public Response optionalTaskResponse(String taskid, String objectType, Optional x) + private Response workItemsResponse(final Function> fn) + { + return asLeaderWith( + taskMaster.getTaskRunner(), + new Function() + { + @Override + public Response apply(TaskRunner taskRunner) + { + return Response.ok( + Lists.transform( + Lists.newArrayList(fn.apply(taskRunner)), + new Function() + { + @Override + public TaskResponseObject apply(TaskRunnerWorkItem workItem) + { + return new TaskResponseObject( + workItem.getTaskId(), + workItem.getCreatedTime(), + workItem.getQueueInsertionTime(), + Optional.absent() + ); + } + } + ) + ).build(); + } + } + ); + } + + private Response optionalTaskResponse(String taskid, String objectType, Optional x) { final Map results = Maps.newHashMap(); results.put("task", taskid); @@ -385,7 +443,7 @@ public class OverlordResource } } - public Response asLeaderWith(Optional x, Function f) + private Response asLeaderWith(Optional x, Function f) { if (x.isPresent()) { return f.apply(x.get()); @@ -394,4 +452,62 @@ public class OverlordResource return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); } } + + private static class TaskResponseObject + { + private final String id; + private final DateTime createdTime; + private final DateTime queueInsertionTime; + private final Optional status; + + private TaskResponseObject( + String id, + DateTime createdTime, + DateTime queueInsertionTime, + Optional status + ) + { + this.id = id; + this.createdTime = createdTime; + this.queueInsertionTime = queueInsertionTime; + this.status = status; + } + + public String getId() + { + return id; + } + + public DateTime getCreatedTime() + { + return createdTime; + } + + public DateTime getQueueInsertionTime() + { + return queueInsertionTime; + } + + public Optional getStatus() + { + return status; + } + + @JsonValue + public Map toJson() + { + final Map data = Maps.newLinkedHashMap(); + data.put("id", id); + if (createdTime.getMillis() > 0) { + data.put("createdTime", createdTime); + } + if (queueInsertionTime.getMillis() > 0) { + data.put("queueInsertionTime", queueInsertionTime); + } + if (status.isPresent()) { + data.put("statusCode", status.get().getStatusCode().toString()); + } + return data; + } + } } diff --git a/indexing-service/src/main/resources/indexer_static/console.html b/indexing-service/src/main/resources/indexer_static/console.html index 6223eee6a1f..e8221aa287e 100644 --- a/indexing-service/src/main/resources/indexer_static/console.html +++ b/indexing-service/src/main/resources/indexer_static/console.html @@ -43,16 +43,24 @@
Loading Running Tasks... this may take a few minutes
-

Pending Tasks

+

Pending Tasks - Tasks waiting to be assigned to a worker

Loading Pending Tasks... this may take a few minutes
-

Workers

+

Waiting Tasks - Tasks waiting on locks

+
Loading Waiting Tasks... this may take a few minutes
+
+ +

Complete Tasks - Tasks recently completed

+
Loading Complete Tasks... this may take a few minutes
+
+ +

Remote Workers

Loading Workers... this may take a few minutes
-

Event Log

-
Loading Event Log... this may take a few minutes
+

Autoscaling Activity

+
Loading Autoscaling Activities... this may take a few minutes
diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index e3ce86c85c9..adaa1fba83f 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -3,14 +3,39 @@ var oTable = []; $(document).ready(function() { + var augment = function(data) { + for (i = 0 ; i < data.length ; i++) { + var taskId = encodeURIComponent(data[i].id) + data[i].more = + 'payload' + + 'status' + + 'log (all)' + + 'log (last 8kb)' + } + } + $.get('/druid/indexer/v1/runningTasks', function(data) { $('.running_loading').hide(); - buildTable(data, $('#runningTable'), ["segments"]); + augment(data); + buildTable(data, $('#runningTable')); }); $.get('/druid/indexer/v1/pendingTasks', function(data) { $('.pending_loading').hide(); - buildTable(data, $('#pendingTable'), ["segments"]); + augment(data); + buildTable(data, $('#pendingTable')); + }); + + $.get('/druid/indexer/v1/waitingTasks', function(data) { + $('.waiting_loading').hide(); + augment(data); + buildTable(data, $('#waitingTable')); + }); + + $.get('/druid/indexer/v1/completeTasks', function(data) { + $('.complete_loading').hide(); + augment(data); + buildTable(data, $('#completeTable')); }); $.get('/druid/indexer/v1/workers', function(data) { @@ -22,4 +47,4 @@ $(document).ready(function() { $('.events_loading').hide(); buildTable(data, $('#eventTable')); }); -}); \ No newline at end of file +}); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index cc69067d23c..178cae10513 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -32,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec; /** */ @JsonTypeName("test_realtime") -public class TestRealtimeTask extends RealtimeIndexTask implements TestTask +public class TestRealtimeTask extends RealtimeIndexTask { private final TaskStatus status; @@ -64,13 +64,6 @@ public class TestRealtimeTask extends RealtimeIndexTask implements TestTask return "test_realtime"; } - @Override - @JsonProperty - public TaskStatus getStatus() - { - return status; - } - @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 485b3202b54..77ae0af4b52 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -294,7 +294,58 @@ public class TaskSerdeTest Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getInterval(), task2.getInterval()); - Assert.assertEquals(task.getSegments(), ((AppendTask) task2).getSegments()); + Assert.assertEquals(task.getSegments(), task2.getSegments()); + } + + @Test + public void testArchiveTaskSerde() throws Exception + { + final ArchiveTask task = new ArchiveTask( + null, + "foo", + new Interval("2010-01-01/P1D") + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final ArchiveTask task2 = (ArchiveTask) jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getInterval(), task2.getInterval()); + } + + @Test + public void testMoveTaskSerde() throws Exception + { + final MoveTask task = new MoveTask( + null, + "foo", + new Interval("2010-01-01/P1D"), + ImmutableMap.of("bucket", "hey", "baseKey", "what") + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final MoveTask task2 = (MoveTask) jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval()); + Assert.assertEquals(ImmutableMap.of("bucket", "hey", "baseKey", "what"), task.getTargetLoadSpec()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getInterval(), task2.getInterval()); + Assert.assertEquals(task.getTargetLoadSpec(), task2.getTargetLoadSpec()); } @Test diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index ac90c1e12f0..85637d75c51 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -19,6 +19,7 @@ package io.druid.indexing.overlord; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -53,6 +54,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.KillTask; @@ -62,7 +64,9 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPuller; @@ -84,6 +88,7 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; public class TaskLifecycleTest @@ -115,8 +120,15 @@ public class TaskLifecycleTest tmp = Files.createTempDir(); - final TaskQueueConfig tqc = new DefaultObjectMapper().readValue("{\"startDelay\":\"PT0S\"}", TaskQueueConfig.class); - ts = new HeapMemoryTaskStorage(); + final TaskQueueConfig tqc = new DefaultObjectMapper().readValue( + "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}", + TaskQueueConfig.class + ); + ts = new HeapMemoryTaskStorage( + new TaskStorageConfig() + { + } + ); tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); mdc = newMockMDC(); @@ -147,6 +159,22 @@ public class TaskLifecycleTest } }, + new DataSegmentMover() + { + @Override + public DataSegment move(DataSegment dataSegment, Map targetLoadSpec) throws SegmentLoadingException + { + return dataSegment; + } + }, + new DataSegmentArchiver() + { + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + return segment; + } + }, null, // segment announcer null, // new segment server view null, // query runner factory conglomerate corporation unionized collective @@ -289,6 +317,34 @@ public class TaskLifecycleTest Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); } + @Test + public void testNoopTask() throws Exception + { + final Task noopTask = new DefaultObjectMapper().readValue( + "{\"type\":\"noop\", \"runTime\":\"100\"}\"", + Task.class + ); + final TaskStatus status = runTask(noopTask); + + Assert.assertEquals("statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode()); + Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); + Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); + } + + @Test + public void testNeverReadyTask() throws Exception + { + final Task neverReadyTask = new DefaultObjectMapper().readValue( + "{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"", + Task.class + ); + final TaskStatus status = runTask(neverReadyTask); + + Assert.assertEquals("statusCode", TaskStatus.Status.FAILED, status.getStatusCode()); + Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); + Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); + } + @Test public void testSimple() throws Exception { @@ -400,28 +456,41 @@ public class TaskLifecycleTest Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } - private TaskStatus runTask(Task task) + private TaskStatus runTask(final Task task) throws Exception { + final Task dummyTask = new DefaultObjectMapper().readValue( + "{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"", + Task.class + ); final long startTime = System.currentTimeMillis(); + Preconditions.checkArgument(!task.getId().equals(dummyTask.getId())); + + tq.add(dummyTask); tq.add(task); - TaskStatus status; + TaskStatus retVal = null; - try { - while ((status = tsqa.getStatus(task.getId()).get()).isRunnable()) { - if (System.currentTimeMillis() > startTime + 10 * 1000) { - throw new ISE("Where did the task go?!: %s", task.getId()); + for (final String taskId : ImmutableList.of(dummyTask.getId(), task.getId())) { + try { + TaskStatus status; + while ((status = tsqa.getStatus(taskId).get()).isRunnable()) { + if (System.currentTimeMillis() > startTime + 10 * 1000) { + throw new ISE("Where did the task go?!: %s", task.getId()); + } + + Thread.sleep(100); } - - Thread.sleep(100); + if (taskId.equals(task.getId())) { + retVal = status; + } + } + catch (Exception e) { + throw Throwables.propagate(e); } } - catch (Exception e) { - throw Throwables.propagate(e); - } - return status; + return retVal; } private static class MockIndexerDBCoordinator extends IndexerDBCoordinator diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index f80ca3cd8db..8d4bf32b870 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -122,7 +122,7 @@ public class WorkerTaskMonitorTest new ThreadPoolTaskRunner( new TaskToolboxFactory( new TaskConfig(tmp.toString(), null, null, 0), - null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( + null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( new OmniSegmentLoader( ImmutableMap.of( "local", @@ -209,4 +209,4 @@ public class WorkerTaskMonitorTest Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId()); Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode()); } -} \ No newline at end of file +} diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml index b41901f813c..7c282e9ecf8 100644 --- a/kafka-eight/pom.xml +++ b/kafka-eight/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index c8bf876cc92..86c165c04d8 100644 --- a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -107,7 +107,15 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory return null; } - return parser.parse(ByteBuffer.wrap(message)); + try { + return parser.parse(ByteBuffer.wrap(message)); + } + catch (Exception e) { + throw new FormattedException.Builder() + .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW) + .withMessage(String.format("Error parsing[%s], got [%s]", ByteBuffer.wrap(message), e.toString())) + .build(); + } } @Override diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml index 984ed52c9b3..7f6d33e03a4 100644 --- a/kafka-seven/pom.xml +++ b/kafka-seven/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java index 8f200d1cdbc..c227b323877 100644 --- a/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java +++ b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java @@ -120,7 +120,15 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory public InputRow parseMessage(Message message) throws FormattedException { - return parser.parse(message.payload()); + try { + return parser.parse(message.payload()); + } + catch (Exception e) { + throw new FormattedException.Builder() + .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW) + .withMessage(String.format("Error parsing[%s], got [%s]", message.payload(), e.toString())) + .build(); + } } @Override diff --git a/pom.xml b/pom.xml index 56e856f4773..e75dd1f81b4 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ io.druid druid pom - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT druid druid @@ -41,7 +41,7 @@ UTF-8 0.25.1 2.1.0-incubating - 0.1.6 + 0.1.7 diff --git a/processing/pom.xml b/processing/pom.xml index 14ec5a3fc29..4e719dc1d8a 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 8710b627ece..316c8d8675e 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -84,6 +84,11 @@ public class ChainedExecutionQueryRunner implements QueryRunner { final int priority = Integer.parseInt(query.getContextValue("priority", "0")); + if (Iterables.isEmpty(queryables)) { + log.warn("No queryables found."); + return Sequences.empty(); + } + return new BaseSequence>( new BaseSequence.IteratorMaker>() { diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index ff55c9cee7f..927ab89676f 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -265,9 +265,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory final Object[] args = new Object[size + 1]; args[0] = current; - int i = 0; - while (i < size) { - args[i + 1] = selectorList[i++].get(); + for (int i = 0 ; i < size ; i++) { + final ObjectColumnSelector selector = selectorList[i]; + if (selector != null) { + args[i + 1] = selector.get(); + } } final Object res = fnAggregate.call(cx, scope, scope, args); diff --git a/processing/src/test/java/io/druid/query/aggregation/JavaScriptAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/JavaScriptAggregatorTest.java index 2435211dfe9..7f087559339 100644 --- a/processing/src/test/java/io/druid/query/aggregation/JavaScriptAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/JavaScriptAggregatorTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.Map; public class JavaScriptAggregatorTest @@ -141,6 +142,39 @@ public class JavaScriptAggregatorTest Assert.assertEquals(val, agg.get(buf, position)); } + @Test + public void testAggregateMissingColumn() + { + Map script = scriptDoubleSum; + + JavaScriptAggregator agg = new JavaScriptAggregator( + "billy", + Collections.singletonList(null), + JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"), + script.get("fnReset"), + script.get("fnCombine")) + ); + + final double val = 0; + + Assert.assertEquals("billy", agg.getName()); + + agg.reset(); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + + agg.aggregate(); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + + agg.aggregate(); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + Assert.assertEquals(val, agg.get()); + } + public static void main(String... args) throws Exception { final LoopingFloatColumnSelector selector = new LoopingFloatColumnSelector(new float[]{42.12f, 9f}); diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml index febab9e8ffc..a3b5e936cad 100644 --- a/rabbitmq/pom.xml +++ b/rabbitmq/pom.xml @@ -9,7 +9,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/s3-extensions/pom.xml b/s3-extensions/pom.xml index 4853bce92d7..ae229afc5d8 100644 --- a/s3-extensions/pom.xml +++ b/s3-extensions/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java new file mode 100644 index 00000000000..0da038352f0 --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java @@ -0,0 +1,58 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import io.druid.segment.loading.DataSegmentArchiver; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; + + +public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSegmentArchiver +{ + private final S3DataSegmentArchiverConfig config; + + @Inject + public S3DataSegmentArchiver( + RestS3Service s3Client, + S3DataSegmentArchiverConfig config + ) + { + super(s3Client); + this.config = config; + } + + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + String targetS3Bucket = config.getArchiveBucket(); + String targetS3BaseKey = config.getArchiveBaseKey(); + + return move( + segment, + ImmutableMap.of( + "bucket", targetS3Bucket, + "baseKey", targetS3BaseKey + ) + ); + } +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java new file mode 100644 index 00000000000..5eb33eb1b5d --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java @@ -0,0 +1,41 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class S3DataSegmentArchiverConfig +{ + @JsonProperty + public String archiveBucket = ""; + + @JsonProperty + public String archiveBaseKey = ""; + + public String getArchiveBucket() + { + return archiveBucket; + } + + public String getArchiveBaseKey() + { + return archiveBaseKey; + } +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java index b9a44f631c6..0e4fde44d76 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java @@ -53,7 +53,7 @@ public class S3DataSegmentKiller implements DataSegmentKiller Map loadSpec = segment.getLoadSpec(); String s3Bucket = MapUtils.getString(loadSpec, "bucket"); String s3Path = MapUtils.getString(loadSpec, "key"); - String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java new file mode 100644 index 00000000000..f7cbd25ebdd --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -0,0 +1,152 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; + +import java.util.Map; +import java.util.concurrent.Callable; + +public class S3DataSegmentMover implements DataSegmentMover +{ + private static final Logger log = new Logger(S3DataSegmentMover.class); + + private final RestS3Service s3Client; + + @Inject + public S3DataSegmentMover( + RestS3Service s3Client + ) + { + this.s3Client = s3Client; + } + + @Override + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException + { + try { + Map loadSpec = segment.getLoadSpec(); + String s3Bucket = MapUtils.getString(loadSpec, "bucket"); + String s3Path = MapUtils.getString(loadSpec, "key"); + String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); + + final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket"); + final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey"); + + final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, segment); + String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path); + + if (targetS3Bucket.isEmpty()) { + throw new SegmentLoadingException("Target S3 bucket is not specified"); + } + if (targetS3Path.isEmpty()) { + throw new SegmentLoadingException("Target S3 baseKey is not specified"); + } + + safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path); + safeMove(s3Bucket, s3DescriptorPath, targetS3Bucket, targetS3DescriptorPath); + + return segment.withLoadSpec( + ImmutableMap.builder() + .putAll( + Maps.filterKeys( + loadSpec, new Predicate() + { + @Override + public boolean apply(String input) + { + return !(input.equals("bucket") || input.equals("key")); + } + } + ) + ) + .put("bucket", targetS3Bucket) + .put("key", targetS3Path) + .build() + ); + } + catch (ServiceException e) { + throw new SegmentLoadingException(e, "Unable to move segment[%s]", segment.getIdentifier()); + } + } + + private void safeMove( + final String s3Bucket, + final String s3Path, + final String targetS3Bucket, + final String targetS3Path + ) throws ServiceException, SegmentLoadingException + { + try { + S3Utils.retryS3Operation( + new Callable() + { + @Override + public Void call() throws Exception + { + if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + log.info( + "Moving file[s3://%s/%s] to [s3://%s/%s]", + s3Bucket, + s3Path, + targetS3Bucket, + targetS3Path + ); + s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, new S3Object(targetS3Path), false); + } else { + // ensure object exists in target location + if (s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) { + log.info( + "Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]", + s3Bucket, s3Path, + targetS3Bucket, targetS3Path + ); + } else { + throw new SegmentLoadingException( + "Unable to move file [s3://%s/%s] to [s3://%s/%s], not present in either source or target location", + s3Bucket, s3Path, + targetS3Bucket, targetS3Path + ); + } + } + return null; + } + } + ); + } + catch (Exception e) { + Throwables.propagateIfInstanceOf(e, ServiceException.class); + Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class); + throw Throwables.propagate(e); + } + } +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index e8b1a99710f..664c270799b 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -20,7 +20,6 @@ package io.druid.storage.s3; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; @@ -29,7 +28,6 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.utils.CompressionUtils; import org.jets3t.service.ServiceException; @@ -45,7 +43,6 @@ import java.util.concurrent.Callable; public class S3DataSegmentPusher implements DataSegmentPusher { private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class); - private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final RestS3Service s3Client; private final S3DataSegmentPusherConfig config; @@ -73,10 +70,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { log.info("Uploading [%s] to S3", indexFilesDir); - final String outputKey = JOINER.join( - config.getBaseKey().isEmpty() ? null : config.getBaseKey(), - DataSegmentPusherUtil.getStorageDir(inSegment) - ); + final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), inSegment); final File zipOutFile = File.createTempFile("druid", "index.zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile); @@ -90,8 +84,10 @@ public class S3DataSegmentPusher implements DataSegmentPusher S3Object toPush = new S3Object(zipOutFile); final String outputBucket = config.getBucket(); + final String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); + toPush.setBucketName(outputBucket); - toPush.setKey(outputKey + "/index.zip"); + toPush.setKey(s3Path); if (!config.getDisableAcl()) { toPush.setAcl(AccessControlList.REST_CANNED_AUTHENTICATED_READ); } @@ -116,7 +112,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile); S3Object descriptorObject = new S3Object(descriptorFile); descriptorObject.setBucketName(outputBucket); - descriptorObject.setKey(outputKey + "/descriptor.json"); + descriptorObject.setKey(s3DescriptorPath); if (!config.getDisableAcl()) { descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); } @@ -142,4 +138,4 @@ public class S3DataSegmentPusher implements DataSegmentPusher throw Throwables.propagate(e); } } -} \ No newline at end of file +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index f2675251f19..d30f49f976a 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -51,8 +51,11 @@ public class S3StorageDruidModule implements DruidModule Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentArchiverBinder(binder).addBinding("s3_zip").to(S3DataSegmentArchiver.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); + JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class); diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 619ad737cfe..6cf481fa2f9 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -19,9 +19,12 @@ package io.druid.storage.s3; +import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.metamx.common.RetryUtils; -import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; +import io.druid.segment.loading.DataSegmentPusherUtil; +import io.druid.timeline.DataSegment; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; @@ -34,6 +37,8 @@ import java.util.concurrent.Callable; */ public class S3Utils { + private static final Joiner JOINER = Joiner.on("/").skipNulls(); + public static void closeStreamsQuietly(S3Object s3Obj) { if (s3Obj == null) { @@ -61,9 +66,9 @@ public class S3Utils { if (e instanceof IOException) { return true; - } else if (e instanceof S3ServiceException) { + } else if (e instanceof ServiceException) { final boolean isIOException = e.getCause() instanceof IOException; - final boolean isTimeout = "RequestTimeout".equals(((S3ServiceException) e).getS3ErrorCode()); + final boolean isTimeout = "RequestTimeout".equals(((ServiceException) e).getErrorCode()); return isIOException || isTimeout; } else { return false; @@ -75,18 +80,18 @@ public class S3Utils } public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey) - throws S3ServiceException + throws ServiceException { try { s3Client.getObjectDetails(new S3Bucket(bucketName), objectKey); } - catch (S3ServiceException e) { + catch (ServiceException e) { if (404 == e.getResponseCode() - || "NoSuchKey".equals(e.getS3ErrorCode()) - || "NoSuchBucket".equals(e.getS3ErrorCode())) { + || "NoSuchKey".equals(e.getErrorCode()) + || "NoSuchBucket".equals(e.getErrorCode())) { return false; } - if ("AccessDenied".equals(e.getS3ErrorCode())) { + if ("AccessDenied".equals(e.getErrorCode())) { // Object is inaccessible to current user, but does exist. return true; } @@ -96,4 +101,17 @@ public class S3Utils return true; } + + public static String constructSegmentPath(String baseKey, DataSegment segment) + { + return JOINER.join( + baseKey.isEmpty() ? null : baseKey, + DataSegmentPusherUtil.getStorageDir(segment) + ) + "/index.zip"; + } + + public static String descriptorPathForSegmentPath(String s3Path) + { + return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + } } diff --git a/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java b/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java new file mode 100644 index 00000000000..6206da881a4 --- /dev/null +++ b/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java @@ -0,0 +1,161 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.metamx.common.MapUtils; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.model.StorageObject; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +public class S3DataSegmentMoverTest +{ + private static final DataSegment sourceSegment = new DataSegment( + "test", + new Interval("2013-01-01/2013-01-02"), + "1", + ImmutableMap.of( + "key", + "baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip", + "bucket", + "main" + ), + ImmutableList.of("dim1", "dim1"), + ImmutableList.of("metric1", "metric2"), + new NoneShardSpec(), + 0, + 1 + ); + + @Test + public void testMove() throws Exception + { + MockStorageService mockS3Client = new MockStorageService(); + S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); + + mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip")); + mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json")); + + DataSegment movedSegment = mover.move( + sourceSegment, + ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive") + ); + + Map targetLoadSpec = movedSegment.getLoadSpec(); + Assert.assertEquals("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip", MapUtils.getString(targetLoadSpec, "key")); + Assert.assertEquals("archive", MapUtils.getString(targetLoadSpec, "bucket")); + Assert.assertTrue(mockS3Client.didMove()); + } + + @Test + public void testMoveNoop() throws Exception + { + MockStorageService mockS3Client = new MockStorageService(); + S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); + + mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip")); + mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json")); + + DataSegment movedSegment = mover.move( + sourceSegment, + ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive") + ); + + Map targetLoadSpec = movedSegment.getLoadSpec(); + + Assert.assertEquals("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip", MapUtils.getString(targetLoadSpec, "key")); + Assert.assertEquals("archive", MapUtils.getString(targetLoadSpec, "bucket")); + Assert.assertFalse(mockS3Client.didMove()); + } + + @Test(expected = SegmentLoadingException.class) + public void testMoveException() throws Exception + { + MockStorageService mockS3Client = new MockStorageService(); + S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); + + mover.move( + sourceSegment, + ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive") + ); + } + + private class MockStorageService extends RestS3Service { + Map> storage = Maps.newHashMap(); + boolean moved = false; + + private MockStorageService() throws S3ServiceException + { + super(null); + } + + public boolean didMove() { + return moved; + } + + @Override + public boolean isObjectInBucket(String bucketName, String objectKey) throws ServiceException + { + Set objects = storage.get(bucketName); + return (objects != null && objects.contains(objectKey)); + } + + @Override + public Map moveObject( + String sourceBucketName, + String sourceObjectKey, + String destinationBucketName, + StorageObject destinationObject, + boolean replaceMetadata + ) throws ServiceException + { + moved = true; + if(isObjectInBucket(sourceBucketName, sourceObjectKey)) { + this.putObject(destinationBucketName, new S3Object(destinationObject.getKey())); + storage.get(sourceBucketName).remove(sourceObjectKey); + } + return null; + } + + @Override + public S3Object putObject(String bucketName, S3Object object) throws S3ServiceException + { + if (!storage.containsKey(bucketName)) { + storage.put(bucketName, Sets.newHashSet()); + } + storage.get(bucketName).add(object.getKey()); + return object; + } + } +} diff --git a/server/pom.xml b/server/pom.xml index 85849f4958f..173512727a5 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT diff --git a/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java new file mode 100644 index 00000000000..bf34bbe17bb --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import io.druid.timeline.DataSegment; + +import java.util.Map; + +public class OmniDataSegmentArchiver implements DataSegmentArchiver +{ + private final Map archivers; + + @Inject + public OmniDataSegmentArchiver( + Map archivers + ) + { + this.archivers = archivers; + } + + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + return getArchiver(segment).archive(segment); + } + + private DataSegmentArchiver getArchiver(DataSegment segment) throws SegmentLoadingException + { + String type = MapUtils.getString(segment.getLoadSpec(), "type"); + DataSegmentArchiver archiver = archivers.get(type); + + if (archiver == null) { + throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, archivers.keySet()); + } + + return archiver; + } +} diff --git a/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java new file mode 100644 index 00000000000..d585b0b7db9 --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import io.druid.timeline.DataSegment; + +import java.util.Map; + +public class OmniDataSegmentMover implements DataSegmentMover +{ + private final Map movers; + + @Inject + public OmniDataSegmentMover( + Map movers + ) + { + this.movers = movers; + } + + @Override + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException + { + return getMover(segment).move(segment, targetLoadSpec); + } + + private DataSegmentMover getMover(DataSegment segment) throws SegmentLoadingException + { + String type = MapUtils.getString(segment.getLoadSpec(), "type"); + DataSegmentMover mover = movers.get(type); + + if (mover == null) { + throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, movers.keySet()); + } + + return mover; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index f78bc0ac390..df96aa45f5e 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.metamx.common.ISE; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.impl.FileIteratingFirehose; @@ -78,21 +79,26 @@ public class LocalFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - final LinkedList files = Lists.newLinkedList( - Arrays.asList( - baseDir.listFiles( - new FilenameFilter() - { - @Override - public boolean accept(File file, String name) - { - return name.contains(filter); - } - } - ) - ) + File[] foundFiles = baseDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File file, String name) + { + return name.contains(filter); + } + } ); + if (foundFiles == null || foundFiles.length == 0) { + throw new ISE("Found no files to ingest! Check your schema."); + } + + final LinkedList files = Lists.newLinkedList( + Arrays.asList(foundFiles) + ); + + return new FileIteratingFirehose( new Iterator() { diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 4cbe042350a..950be651e86 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -116,7 +116,13 @@ public class ServerManager implements QuerySegmentWalker return segmentLoader.isSegmentLoaded(segment); } - public void loadSegment(final DataSegment segment) throws SegmentLoadingException + /** + * Load a single segment. + * @param segment segment to load + * @return true if the segment was newly loaded, false if it was already loaded + * @throws SegmentLoadingException if the segment cannot be loaded + */ + public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException { final Segment adapter; try { @@ -150,8 +156,8 @@ public class ServerManager implements QuerySegmentWalker segment.getVersion() ); if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { - log.info("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); - throw new SegmentLoadingException("Segment already exists[%s]", segment.getIdentifier()); + log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); + return false; } loadedIntervals.add( @@ -165,6 +171,7 @@ public class ServerManager implements QuerySegmentWalker synchronized (dataSourceCounts) { dataSourceCounts.add(dataSource, 1L); } + return true; } } diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index a55341a75a1..246415f57d0 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -230,34 +230,37 @@ public class ZkCoordinator implements DataSegmentChangeHandler try { log.info("Loading segment %s", segment.getIdentifier()); + final boolean loaded; try { - serverManager.loadSegment(segment); + loaded = serverManager.loadSegment(segment); } catch (Exception e) { removeSegment(segment); throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); } - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); + announcer.announceSegment(segment); } catch (IOException e) { - removeSegment(segment); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } } - try { - announcer.announceSegment(segment); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); - } - } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segment for dataSource") @@ -275,8 +278,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler for (DataSegment segment : segments) { log.info("Loading segment %s", segment.getIdentifier()); + final boolean loaded; try { - serverManager.loadSegment(segment); + loaded = serverManager.loadSegment(segment); } catch (Exception e) { log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); @@ -285,20 +289,22 @@ public class ZkCoordinator implements DataSegmentChangeHandler continue; } - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + continue; + } } - catch (IOException e) { - log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); - removeSegment(segment); - segmentFailures.add(segment.getIdentifier()); - continue; - } - } - validSegments.add(segment); + validSegments.add(segment); + } } try { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 0ae18185015..71a4d0eb08c 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -46,10 +46,13 @@ import io.druid.client.ServerInventoryView; import io.druid.client.indexing.IndexingServiceClient; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseSegmentManager; import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; import io.druid.segment.IndexIO; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -99,6 +102,8 @@ public class DruidCoordinator private final LoadQueueTaskMaster taskMaster; private final Map loadManagementPeons; private final AtomicReference leaderLatch; + private final ServiceAnnouncer serviceAnnouncer; + private final DruidNode self; @Inject public DruidCoordinator( @@ -112,7 +117,9 @@ public class DruidCoordinator ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, - LoadQueueTaskMaster taskMaster + LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + @Self DruidNode self ) { this( @@ -127,6 +134,8 @@ public class DruidCoordinator scheduledExecutorFactory, indexingServiceClient, taskMaster, + serviceAnnouncer, + self, Maps.newConcurrentMap() ); } @@ -143,6 +152,8 @@ public class DruidCoordinator ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + DruidNode self, ConcurrentMap loadQueuePeonMap ) { @@ -157,6 +168,8 @@ public class DruidCoordinator this.emitter = emitter; this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; + this.serviceAnnouncer = serviceAnnouncer; + this.self = self; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); @@ -474,6 +487,7 @@ public class DruidCoordinator databaseSegmentManager.start(); databaseRuleManager.start(); serverInventoryView.start(); + serviceAnnouncer.announce(self); final List> coordinatorRunnables = Lists.newArrayList(); dynamicConfigs = configManager.watch( @@ -554,8 +568,10 @@ public class DruidCoordinator } loadManagementPeons.clear(); - databaseSegmentManager.stop(); + serviceAnnouncer.unannounce(self); serverInventoryView.stop(); + databaseRuleManager.stop(); + databaseSegmentManager.stop(); leader = false; } catch (Exception e) { diff --git a/server/src/main/java/io/druid/server/http/BackwardsCompatiableInfoResource.java b/server/src/main/java/io/druid/server/http/BackwardsCompatibleInfoResource.java similarity index 93% rename from server/src/main/java/io/druid/server/http/BackwardsCompatiableInfoResource.java rename to server/src/main/java/io/druid/server/http/BackwardsCompatibleInfoResource.java index 11fe97da9be..ed1cf580887 100644 --- a/server/src/main/java/io/druid/server/http/BackwardsCompatiableInfoResource.java +++ b/server/src/main/java/io/druid/server/http/BackwardsCompatibleInfoResource.java @@ -32,10 +32,10 @@ import javax.ws.rs.Path; /** */ @Path("/static/info") -public class BackwardsCompatiableInfoResource extends InfoResource +public class BackwardsCompatibleInfoResource extends InfoResource { @Inject - public BackwardsCompatiableInfoResource( + public BackwardsCompatibleInfoResource( DruidCoordinator coordinator, InventoryView serverInventoryView, DatabaseSegmentManager databaseSegmentManager, diff --git a/server/src/main/resources/static/index.html b/server/src/main/resources/static/index.html index fb8831e29ed..cec3d620e88 100644 --- a/server/src/main/resources/static/index.html +++ b/server/src/main/resources/static/index.html @@ -32,9 +32,6 @@
-
-

Druid Version: ${pom.version} Druid API Version: ${druid.api.version}

-
diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 8f55a93948c..58323faa863 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,8 +23,10 @@ import com.google.common.collect.MapMaker; import com.metamx.common.concurrent.ScheduledExecutorFactory; import io.druid.client.DruidServer; import io.druid.client.SingleServerInventoryView; +import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.db.DatabaseSegmentManager; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; @@ -111,6 +113,8 @@ public class DruidCoordinatorTest scheduledExecutorFactory, null, taskMaster, + new NoopServiceAnnouncer(), + new DruidNode("hey", "what", 1234), loadManagementPeons ); } diff --git a/services/pom.xml b/services/pom.xml index 768c1af3007..fa580b17910 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -27,7 +27,7 @@ io.druid druid - 0.6.34-SNAPSHOT + 0.6.42-SNAPSHOT @@ -68,7 +68,7 @@ org.apache.maven.plugins maven-shade-plugin - 1.6 + 2.2 package @@ -89,6 +89,9 @@ + + + diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 5e20ee4bf8a..3a7c88d4426 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -53,7 +53,7 @@ import java.util.List; */ @Command( name = "broker", - description = "Runs a broker node, see http://druid.io/docs/0.6.33/Broker.html for a description" + description = "Runs a broker node, see http://druid.io/docs/0.6.40/Broker.html for a description" ) public class CliBroker extends ServerRunnable { diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index dcf638ab391..486511b1104 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -28,7 +28,6 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.indexing.IndexingServiceClient; -import io.druid.curator.discovery.DiscoveryModule; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManagerConfig; import io.druid.db.DatabaseRuleManagerProvider; @@ -41,18 +40,16 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Self; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; -import io.druid.server.http.BackwardsCompatiableInfoResource; +import io.druid.server.http.BackwardsCompatibleInfoResource; import io.druid.server.http.CoordinatorDynamicConfigsResource; import io.druid.server.http.CoordinatorRedirectInfo; import io.druid.server.http.CoordinatorResource; import io.druid.server.http.InfoResource; import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectInfo; -import io.druid.server.http.RedirectServlet; import io.druid.server.initialization.JettyServerInitializer; import org.apache.curator.framework.CuratorFramework; import org.eclipse.jetty.server.Server; @@ -63,7 +60,7 @@ import java.util.List; */ @Command( name = "coordinator", - description = "Runs the Coordinator, see http://druid.io/docs/0.6.33/Coordinator.html for a description." + description = "Runs the Coordinator, see http://druid.io/docs/0.6.40/Coordinator.html for a description." ) public class CliCoordinator extends ServerRunnable { @@ -88,8 +85,8 @@ public class CliCoordinator extends ServerRunnable JsonConfigProvider.bind(binder, "druid.manager.segments", DatabaseSegmentManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class); - binder.bind(RedirectServlet.class).in(LazySingleton.class); binder.bind(RedirectFilter.class).in(LazySingleton.class); + binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); binder.bind(DatabaseSegmentManager.class) .toProvider(DatabaseSegmentManagerProvider.class) @@ -101,15 +98,12 @@ public class CliCoordinator extends ServerRunnable binder.bind(IndexingServiceClient.class).in(LazySingleton.class); - binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); - binder.bind(DruidCoordinator.class); LifecycleModule.register(binder, DruidCoordinator.class); - DiscoveryModule.register(binder, Self.class); binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer()); - Jerseys.addResource(binder, BackwardsCompatiableInfoResource.class); + Jerseys.addResource(binder, BackwardsCompatibleInfoResource.class); Jerseys.addResource(binder, InfoResource.class); Jerseys.addResource(binder, CoordinatorResource.class); Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class); diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java index a6609936fac..2cdc9706cef 100644 --- a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -41,7 +41,7 @@ import java.util.List; */ @Command( name = "hadoop", - description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.33/Batch-ingestion.html for a description." + description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.40/Batch-ingestion.html for a description." ) public class CliHadoopIndexer implements Runnable { diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 29d436ff7e4..e9c5ece9889 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -42,7 +42,7 @@ import java.util.List; */ @Command( name = "historical", - description = "Runs a Historical node, see http://druid.io/docs/0.6.33/Historical.html for a description" + description = "Runs a Historical node, see http://druid.io/docs/0.6.40/Historical.html for a description" ) public class CliHistorical extends ServerRunnable { diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 9b869ce875e..aa68c14c6bb 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -44,7 +44,7 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; -import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; @@ -79,9 +79,7 @@ import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogs; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -95,7 +93,7 @@ import java.util.List; */ @Command( name = "overlord", - description = "Runs an Overlord node, see http://druid.io/docs/0.6.33/Indexing-Service.html for a description" + description = "Runs an Overlord node, see http://druid.io/docs/0.6.40/Indexing-Service.html for a description" ) public class CliOverlord extends ServerRunnable { @@ -120,7 +118,11 @@ public class CliOverlord extends ServerRunnable binder.bind(TaskMaster.class).in(ManageLifecycle.class); binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); - binder.bind(new TypeLiteral>(){}) + binder.bind( + new TypeLiteral>() + { + } + ) .toProvider( new ListProvider() .add(TaskRunnerTaskLogStreamer.class) @@ -154,10 +156,15 @@ public class CliOverlord extends ServerRunnable private void configureTaskStorage(Binder binder) { + JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class); + PolyBind.createChoice( binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class) ); - final MapBinder storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class)); + final MapBinder storageBinder = PolyBind.optionBinder( + binder, + Key.get(TaskStorage.class) + ); storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class); binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class); @@ -176,7 +183,10 @@ public class CliOverlord extends ServerRunnable Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class) ); - final MapBinder biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); + final MapBinder biddy = PolyBind.optionBinder( + binder, + Key.get(TaskRunnerFactory.class) + ); IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); @@ -189,7 +199,9 @@ public class CliOverlord extends ServerRunnable private void configureAutoscale(Binder binder) { JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); - binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class); + binder.bind(ResourceManagementStrategy.class) + .to(SimpleResourceManagementStrategy.class) + .in(LazySingleton.class); JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null); @@ -218,14 +230,18 @@ public class CliOverlord extends ServerRunnable } /** - */ + */ private static class OverlordJettyServerInitializer implements JettyServerInitializer { @Override public void initialize(Server server, Injector injector) { - ResourceHandler resourceHandler = new ResourceHandler(); - resourceHandler.setBaseResource( + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + + ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class); + + root.addServlet(holderPwd, "/"); + root.setBaseResource( new ResourceCollection( new String[]{ TaskMaster.class.getClassLoader().getResource("static").toExternalForm(), @@ -233,18 +249,17 @@ public class CliOverlord extends ServerRunnable } ) ); + root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); + root.addFilter(GzipFilter.class, "/*", null); - final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); - root.setContextPath("/"); + // Can't use /* here because of Guice and Jetty static content conflicts + root.addFilter(GuiceFilter.class, "/status/*", null); + root.addFilter(GuiceFilter.class, "/druid/*", null); HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); - server.setHandler(handlerList); + handlerList.setHandlers(new Handler[]{root}); - root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - root.addFilter(GzipFilter.class, "/*", null); - root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); - root.addFilter(GuiceFilter.class, "/*", null); + server.setHandler(handlerList); } } } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index e8a5b985bd4..7204e5fc63a 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -60,8 +60,12 @@ import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.query.QuerySegmentWalker; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.OmniDataSegmentArchiver; import io.druid.segment.loading.OmniDataSegmentKiller; +import io.druid.segment.loading.OmniDataSegmentMover; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.QueryResource; @@ -129,6 +133,10 @@ public class CliPeon extends GuiceRunnable // Build it to make it bind even if nothing binds to it. Binders.dataSegmentKillerBinder(binder); binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder); + binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentArchiverBinder(binder); + binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class); binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); binder.bind(ExecutorLifecycleConfig.class).toInstance( diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java index aa27e625538..70baabbc3c2 100644 --- a/services/src/main/java/io/druid/cli/CliRealtime.java +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -30,7 +30,7 @@ import java.util.List; */ @Command( name = "realtime", - description = "Runs a realtime node, see http://druid.io/docs/0.6.33/Realtime.html for a description" + description = "Runs a realtime node, see http://druid.io/docs/0.6.40/Realtime.html for a description" ) public class CliRealtime extends ServerRunnable { diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 6a51fdcc0d7..f4b5c4ab5d8 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -42,7 +42,7 @@ import java.util.concurrent.Executor; */ @Command( name = "realtime", - description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.33/Realtime.html for a description" + description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.40/Realtime.html for a description" ) public class CliRealtimeExample extends ServerRunnable { diff --git a/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java b/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java index bf5fcd14cc8..be7a59ea2a8 100644 --- a/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java @@ -26,9 +26,7 @@ import io.druid.server.http.RedirectFilter; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -36,25 +34,30 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.GzipFilter; /** -*/ + */ class CoordinatorJettyServerInitializer implements JettyServerInitializer { @Override public void initialize(Server server, Injector injector) { - ResourceHandler resourceHandler = new ResourceHandler(); - resourceHandler.setResourceBase(DruidCoordinator.class.getClassLoader().getResource("static").toExternalForm()); - final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); - root.setContextPath("/"); + + ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class); + + root.addServlet(holderPwd, "/"); + root.setResourceBase(DruidCoordinator.class.getClassLoader().getResource("static").toExternalForm()); + root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); + root.addFilter(GzipFilter.class, "/*", null); + + // Can't use '/*' here because of Guice and Jetty static content conflicts + // The coordinator really needs a standarized api path + root.addFilter(GuiceFilter.class, "/status/*", null); + root.addFilter(GuiceFilter.class, "/info/*", null); + root.addFilter(GuiceFilter.class, "/coordinator/*", null); HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); - server.setHandler(handlerList); + handlerList.setHandlers(new Handler[]{root}); - root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - root.addFilter(GzipFilter.class, "/*", null); - root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); - root.addFilter(GuiceFilter.class, "/*", null); + server.setHandler(handlerList); } } diff --git a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java index 3cc867700d0..fdbf2c9f2f0 100644 --- a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java +++ b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java @@ -178,7 +178,7 @@ public class ConvertProperties implements Runnable } updatedProps.setProperty( - "druid.monitoring.monitors", "[\"io.druid.server.metrics.ServerMonitor\", \"com.metamx.metrics.SysMonitor\"]" + "druid.monitoring.monitors", "[\"com.metamx.metrics.SysMonitor\"]" ); BufferedWriter out = null;