diff --git a/build.sh b/build.sh index 4053d6c1cdc..e0e5f513761 100755 --- a/build.sh +++ b/build.sh @@ -30,4 +30,4 @@ echo "For examples, see: " echo " " ls -1 examples/*/*sh echo " " -echo "See also http://druid.io/docs/0.6.31" +echo "See also http://druid.io/docs/0.6.36" diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml index 66432acbfa3..d2c783a089d 100644 --- a/cassandra-storage/pom.xml +++ b/cassandra-storage/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index efb65548359..943aec92a08 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/docs/content/Booting-a-production-cluster.md b/docs/content/Booting-a-production-cluster.md index aeea7c47988..db17223c8d5 100644 --- a/docs/content/Booting-a-production-cluster.md +++ b/docs/content/Booting-a-production-cluster.md @@ -3,7 +3,7 @@ layout: doc_page --- # Booting a Single Node Cluster # -[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.31-bin.tar.gz). +[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.36-bin.tar.gz). The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables: diff --git a/docs/content/Examples.md b/docs/content/Examples.md index 4365a424619..fd18b24f1b9 100644 --- a/docs/content/Examples.md +++ b/docs/content/Examples.md @@ -19,13 +19,13 @@ Clone Druid and build it: git clone https://github.com/metamx/druid.git druid cd druid git fetch --tags -git checkout druid-0.6.31 +git checkout druid-0.6.36 ./build.sh ``` ### Downloading the DSK (Druid Standalone Kit) -[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz) a stand-alone tarball and run it: +[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz) a stand-alone tarball and run it: ``` bash tar -xzf druid-services-0.X.X-bin.tar.gz diff --git a/docs/content/Indexing-Service.md b/docs/content/Indexing-Service.md index 0d1cc38621e..2f15b200025 100644 --- a/docs/content/Indexing-Service.md +++ b/docs/content/Indexing-Service.md @@ -56,6 +56,7 @@ With the following JVM configuration: -Ddruid.db.connector.password=diurd -Ddruid.selectors.indexing.serviceName=overlord +-Ddruid.indexer.queue.startDelay=PT0M -Ddruid.indexer.runner.javaOpts="-server -Xmx1g" -Ddruid.indexer.runner.startPort=8081 -Ddruid.indexer.fork.property.druid.computation.buffer.size=268435456 @@ -116,6 +117,7 @@ In addition to the configuration of some of the default modules in [Configuratio |--------|-----------|-------| |`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment.|local| |`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be resumed if the overlord should fail.|local| +|`druid.indexer.storage.recentlyFinishedThreshold`|A duration of time to store task results.|PT24H| |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE| |`druid.indexer.queue.startDelay`|Sleep this long before starting overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M| |`druid.indexer.queue.restartDelay`|Sleep this long when overlord queue management throws an exception before trying again.|PT30S| diff --git a/docs/content/Realtime.md b/docs/content/Realtime.md index 13c56860344..71990ad57d9 100644 --- a/docs/content/Realtime.md +++ b/docs/content/Realtime.md @@ -27,7 +27,7 @@ druid.host=localhost druid.service=realtime druid.port=8083 -druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.31"] +druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.36"] druid.zk.service.host=localhost diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md index b276378fd5b..e3fe41c51c6 100644 --- a/docs/content/Tutorial:-A-First-Look-at-Druid.md +++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md @@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu ### Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz). Download this file to a directory of your choosing. +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz). Download this file to a directory of your choosing. You can extract the awesomeness within by issuing: @@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz Not too lost so far right? That's great! If you cd into the directory: ``` -cd druid-services-0.6.31 +cd druid-services-0.6.36 ``` You should see a bunch of files: diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md index 8c77aba0d07..901420bc751 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md @@ -94,6 +94,7 @@ druid.db.connector.user=druid druid.db.connector.password=diurd druid.selectors.indexing.serviceName=overlord +druid.indexer.queue.startDelay=PT0M druid.indexer.runner.javaOpts="-server -Xmx1g" druid.indexer.runner.startPort=8088 druid.indexer.fork.property.druid.computation.buffer.size=268435456 @@ -246,17 +247,19 @@ Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) should yield: } ] ``` -Problems? ---------- +Console +-------- -If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can read the individual task logs at: +The indexing service overlord has a console located at: ```bash -/log/.log - +localhost:8087/console.html ``` -One thing to note is that the log file will only exist once the task completes with either SUCCESS or FAILURE. +On this console, you can look at statuses and logs of recently submitted and completed tasks. + +If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can use the console to read the individual task logs. + Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html). More information about how to configure this is [here](Configuration.html). Most common data ingestion problems are around timestamp formats and other malformed data issues. diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md index 2ceec4f209a..dc834ac5fd9 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-2.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-2.md @@ -44,7 +44,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h #### Setting up Kafka -[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.31/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node. +[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.36/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node. Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html). diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md index d30c65a8a8b..3cba7a77582 100644 --- a/docs/content/Tutorial:-The-Druid-Cluster.md +++ b/docs/content/Tutorial:-The-Druid-Cluster.md @@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. -You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz) +You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz) and untar the contents within by issuing: @@ -149,7 +149,7 @@ druid.port=8081 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.31"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.36"] # Dummy read only AWS account (used to download example data) druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b @@ -238,7 +238,7 @@ druid.port=8083 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.31","io.druid.extensions:druid-kafka-seven:0.6.31"] +druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.36","io.druid.extensions:druid-kafka-seven:0.6.36"] # Change this config to db to hand off to the rest of the Druid cluster druid.publish.type=noop @@ -253,5 +253,5 @@ druid.processing.buffer.sizeBytes=10000000 Next Steps ---------- -If you are intested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data? +If you are interested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data? Check out the next [tutorial](Tutorial%3A-Loading-Your-Data-Part-1.html) section for more info! diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md index 007feaea850..bf201263c3a 100644 --- a/docs/content/Tutorial:-Webstream.md +++ b/docs/content/Tutorial:-Webstream.md @@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu h3. Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz) +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz) Download this file to a directory of your choosing. You can extract the awesomeness within by issuing: @@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz Not too lost so far right? That's great! If you cd into the directory: ``` -cd druid-services-0.6.31 +cd druid-services-0.6.36 ``` You should see a bunch of files: diff --git a/docs/content/Twitter-Tutorial.textile b/docs/content/Twitter-Tutorial.textile index cf2282b354c..5a0ef90dcf7 100644 --- a/docs/content/Twitter-Tutorial.textile +++ b/docs/content/Twitter-Tutorial.textile @@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source. h3. Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz. +We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz. Download this bad boy to a directory of your choosing. You can extract the awesomeness within by issuing: diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties index b4344b8e775..ce569f88f26 100644 --- a/examples/config/historical/runtime.properties +++ b/examples/config/historical/runtime.properties @@ -4,7 +4,7 @@ druid.port=8081 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.31"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.36"] # Dummy read only AWS account (used to download example data) druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b diff --git a/examples/config/overlord/runtime.properties b/examples/config/overlord/runtime.properties index c9c4478d4c4..f452a7d06d6 100644 --- a/examples/config/overlord/runtime.properties +++ b/examples/config/overlord/runtime.properties @@ -9,6 +9,7 @@ druid.db.connector.user=druid druid.db.connector.password=diurd druid.selectors.indexing.serviceName=overlord +druid.indexer.queue.startDelay=PT0M druid.indexer.runner.javaOpts="-server -Xmx1g" druid.indexer.runner.startPort=8088 druid.indexer.fork.property.druid.computation.buffer.size=268435456 diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties index 9303f37165a..9ba5d55acc4 100644 --- a/examples/config/realtime/runtime.properties +++ b/examples/config/realtime/runtime.properties @@ -4,7 +4,7 @@ druid.port=8083 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.31","io.druid.extensions:druid-kafka-seven:0.6.31","io.druid.extensions:druid-rabbitmq:0.6.31"] +druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.36","io.druid.extensions:druid-kafka-seven:0.6.36","io.druid.extensions:druid-rabbitmq:0.6.36"] # Change this config to db to hand off to the rest of the Druid cluster druid.publish.type=noop diff --git a/examples/pom.xml b/examples/pom.xml index 4473c3704cf..c0019b4c9cc 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml index e0769a73a7b..81247522e1d 100644 --- a/hdfs-storage/pom.xml +++ b/hdfs-storage/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 20be3ac56e1..6a5e90cd0c6 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 5c73a020c75..07974bc9ff0 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java new file mode 100644 index 00000000000..db40c7cb069 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskStorageConfig.java @@ -0,0 +1,19 @@ +package io.druid.indexing.common.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.validation.constraints.NotNull; + +public class TaskStorageConfig +{ + @JsonProperty + @NotNull + public Duration recentlyFinishedThreshold = new Period("PT24H").toStandardDuration(); + + public Duration getRecentlyFinishedThreshold() + { + return recentlyFinishedThreshold; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 001e1520eaa..d45f66377b7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -20,7 +20,9 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; @@ -34,14 +36,34 @@ public class NoopTask extends AbstractTask { private static final Logger log = new Logger(NoopTask.class); private static int defaultRunTime = 2500; + private static int defaultIsReadyTime = 0; + private static IsReadyResult defaultIsReadyResult = IsReadyResult.YES; - private final int runTime; + enum IsReadyResult + { + YES, + NO, + EXCEPTION + } + + @JsonIgnore + private final long runTime; + + @JsonIgnore + private final long isReadyTime; + + @JsonIgnore + private final IsReadyResult isReadyResult; + + @JsonIgnore private final FirehoseFactory firehoseFactory; @JsonCreator public NoopTask( @JsonProperty("id") String id, - @JsonProperty("runTime") int runTime, + @JsonProperty("runTime") long runTime, + @JsonProperty("isReadyTime") long isReadyTime, + @JsonProperty("isReadyResult") String isReadyResult, @JsonProperty("firehose") FirehoseFactory firehoseFactory ) { @@ -51,6 +73,10 @@ public class NoopTask extends AbstractTask ); this.runTime = (runTime == 0) ? defaultRunTime : runTime; + this.isReadyTime = (isReadyTime == 0) ? defaultIsReadyTime : isReadyTime; + this.isReadyResult = (isReadyResult == null) + ? defaultIsReadyResult + : IsReadyResult.valueOf(isReadyResult.toUpperCase()); this.firehoseFactory = firehoseFactory; } @@ -60,12 +86,24 @@ public class NoopTask extends AbstractTask return "noop"; } - @JsonProperty("runTime") - public int getRunTime() + @JsonProperty + public long getRunTime() { return runTime; } + @JsonProperty + public long getIsReadyTime() + { + return isReadyTime; + } + + @JsonProperty + public IsReadyResult getIsReadyResult() + { + return isReadyResult; + } + @JsonProperty("firehose") public FirehoseFactory getFirehoseFactory() { @@ -75,7 +113,16 @@ public class NoopTask extends AbstractTask @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - return true; + switch (isReadyResult) { + case YES: + return true; + case NO: + return false; + case EXCEPTION: + throw new ISE("Not ready. Never will be ready. Go away!"); + default: + throw new AssertionError("#notreached"); + } } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java index 7d3ad05512e..5dc6e1c6fff 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java @@ -34,15 +34,16 @@ import com.metamx.common.RetryUtils; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; -import com.mysql.jdbc.exceptions.MySQLTimeoutException; import com.mysql.jdbc.exceptions.MySQLTransientException; import io.druid.db.DbConnector; import io.druid.db.DbTablesConfig; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; import org.joda.time.DateTime; +import org.joda.time.Period; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.exceptions.CallbackFailedException; @@ -64,16 +65,24 @@ public class DbTaskStorage implements TaskStorage private final DbConnector dbConnector; private final DbTablesConfig dbTables; private final IDBI dbi; + private final TaskStorageConfig config; private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); @Inject - public DbTaskStorage(ObjectMapper jsonMapper, DbConnector dbConnector, DbTablesConfig dbTables, IDBI dbi) + public DbTaskStorage( + final ObjectMapper jsonMapper, + final DbConnector dbConnector, + final DbTablesConfig dbTables, + final IDBI dbi, + final TaskStorageConfig config + ) { this.jsonMapper = jsonMapper; this.dbConnector = dbConnector; this.dbTables = dbTables; this.dbi = dbi; + this.config = config; } @LifecycleStart @@ -271,6 +280,45 @@ public class DbTaskStorage implements TaskStorage ); } + @Override + public List getRecentlyFinishedTaskStatuses() + { + final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold()); + return retryingHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + final List> dbTasks = + handle.createQuery( + String.format( + "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC", + dbTables.getTasksTable() + ) + ).bind("recent", recent.toString()).list(); + + final ImmutableList.Builder statuses = ImmutableList.builder(); + for (final Map row : dbTasks) { + final String id = row.get("id").toString(); + + try { + final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class); + if (status.isComplete()) { + statuses.add(status); + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit(); + } + } + + return statuses.build(); + } + } + ); + } + @Override public void addLock(final String taskid, final TaskLock taskLock) { @@ -407,7 +455,8 @@ public class DbTaskStorage implements TaskStorage for (final Map dbTaskLog : dbTaskLogs) { try { retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class)); - } catch (Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to deserialize TaskLog") .addData("task", taskid) .addData("logPayload", dbTaskLog) @@ -451,7 +500,8 @@ public class DbTaskStorage implements TaskStorage /** * Retry SQL operations */ - private T retryingHandle(final HandleCallback callback) { + private T retryingHandle(final HandleCallback callback) + { final Callable call = new Callable() { @Override @@ -471,9 +521,11 @@ public class DbTaskStorage implements TaskStorage final int maxTries = 10; try { return RetryUtils.retry(call, shouldRetry, maxTries); - } catch (RuntimeException e) { + } + catch (RuntimeException e) { throw Throwables.propagate(e); - } catch (Exception e) { + } + catch (Exception e) { throw new CallbackFailedException(e); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index fce401c6641..1099bddbcc6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -391,7 +391,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer if (offset > 0) { raf.seek(offset); } else if (offset < 0 && offset < rafLength) { - raf.seek(rafLength + offset); + raf.seek(Math.max(0, rafLength + offset)); } return Channels.newInputStream(raf.getChannel()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java index ef23972ebe4..ef942e5c12f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -19,6 +19,7 @@ package io.druid.indexing.overlord; +import com.google.api.client.util.Lists; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -26,11 +27,15 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.Task; +import org.joda.time.DateTime; import java.util.List; import java.util.Map; @@ -42,6 +47,8 @@ import java.util.concurrent.locks.ReentrantLock; */ public class HeapMemoryTaskStorage implements TaskStorage { + private final TaskStorageConfig config; + private final ReentrantLock giant = new ReentrantLock(); private final Map tasks = Maps.newHashMap(); private final Multimap taskLocks = HashMultimap.create(); @@ -49,6 +56,12 @@ public class HeapMemoryTaskStorage implements TaskStorage private static final Logger log = new Logger(HeapMemoryTaskStorage.class); + @Inject + public HeapMemoryTaskStorage(TaskStorageConfig config) + { + this.config = config; + } + @Override public void insert(Task task, TaskStatus status) { @@ -69,7 +82,7 @@ public class HeapMemoryTaskStorage implements TaskStorage } log.info("Inserting task %s with status: %s", task.getId(), status); - tasks.put(task.getId(), new TaskStuff(task, status)); + tasks.put(task.getId(), new TaskStuff(task, status, new DateTime())); } finally { giant.unlock(); } @@ -139,13 +152,39 @@ public class HeapMemoryTaskStorage implements TaskStorage listBuilder.add(taskStuff.getTask()); } } - return listBuilder.build(); } finally { giant.unlock(); } } + @Override + public List getRecentlyFinishedTaskStatuses() + { + giant.lock(); + + try { + final List returns = Lists.newArrayList(); + final long recent = System.currentTimeMillis() - config.getRecentlyFinishedThreshold().getMillis(); + final Ordering createdDateDesc = new Ordering() + { + @Override + public int compare(TaskStuff a, TaskStuff b) + { + return a.getCreatedDate().compareTo(b.getCreatedDate()); + } + }.reverse(); + for(final TaskStuff taskStuff : createdDateDesc.sortedCopy(tasks.values())) { + if(taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().getMillis() > recent) { + returns.add(taskStuff.getStatus()); + } + } + return returns; + } finally { + giant.unlock(); + } + } + @Override public void addLock(final String taskid, final TaskLock taskLock) { @@ -212,8 +251,9 @@ public class HeapMemoryTaskStorage implements TaskStorage { final Task task; final TaskStatus status; + final DateTime createdDate; - private TaskStuff(Task task, TaskStatus status) + private TaskStuff(Task task, TaskStatus status, DateTime createdDate) { Preconditions.checkNotNull(task); Preconditions.checkNotNull(status); @@ -221,6 +261,7 @@ public class HeapMemoryTaskStorage implements TaskStorage this.task = task; this.status = status; + this.createdDate = Preconditions.checkNotNull(createdDate, "createdDate"); } public Task getTask() @@ -233,9 +274,14 @@ public class HeapMemoryTaskStorage implements TaskStorage return status; } + public DateTime getCreatedDate() + { + return createdDate; + } + private TaskStuff withStatus(TaskStatus _status) { - return new TaskStuff(task, _status); + return new TaskStuff(task, _status, createdDate); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 77e4b26e372..693a504542c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -24,6 +24,7 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -224,7 +225,8 @@ public class TaskQueue runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } // Attain futures for all active tasks (assuming they are ready to run). - for (final Task task : tasks) { + // Copy tasks list, as notifyStatus may modify it. + for (final Task task : ImmutableList.copyOf(tasks)) { if (!taskFutures.containsKey(task.getId())) { final ListenableFuture runnerTaskFuture; if (runnerTaskFutures.containsKey(task.getId())) { @@ -236,7 +238,7 @@ public class TaskQueue taskIsReady = task.isReady(taskActionClientFactory.create(task)); } catch (Exception e) { - log.makeAlert(e, "Exception thrown during isReady").addData("task", task.getId()).emit(); + log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); notifyStatus(task, TaskStatus.failure(task.getId())); continue; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java index a78faa24d03..2963c875257 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -19,6 +19,8 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ComparisonChain; import com.google.common.util.concurrent.ListenableFuture; import io.druid.indexing.common.TaskStatus; @@ -56,21 +58,25 @@ public class TaskRunnerWorkItem implements Comparable this.queueInsertionTime = queueInsertionTime; } + @JsonProperty public String getTaskId() { return taskId; } + @JsonIgnore public ListenableFuture getResult() { return result; } + @JsonProperty public DateTime getCreatedTime() { return createdTime; } + @JsonProperty public DateTime getQueueInsertionTime() { return queueInsertionTime; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java index 3a2145627df..fb289459256 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java @@ -82,6 +82,13 @@ public interface TaskStorage */ public List getActiveTasks(); + /** + * Returns a list of recently finished task statuses as stored in the storage facility. No particular order + * is guaranteed, but implementations are encouraged to return tasks in descending order of creation. No particular + * standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing. + */ + public List getRecentlyFinishedTaskStatuses(); + /** * Returns a list of locks for a particular task. */ diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java index e9a2a8d5d7c..67ea11dcf33 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -19,14 +19,19 @@ package io.druid.indexing.overlord; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Inject; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskAction; +import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; +import javax.annotation.Nullable; +import java.util.List; import java.util.Set; /** @@ -42,6 +47,21 @@ public class TaskStorageQueryAdapter this.storage = storage; } + public List getActiveTasks() + { + return storage.getActiveTasks(); + } + + public List getRecentlyFinishedTaskStatuses() + { + return storage.getRecentlyFinishedTaskStatuses(); + } + + public Optional getTask(final String taskid) + { + return storage.getTask(taskid); + } + public Optional getStatus(final String taskid) { return storage.getStatus(taskid); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index d3d58bef03d..f161cb3c278 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -19,15 +19,20 @@ package io.druid.indexing.overlord.http; +import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.InputSupplier; +import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.common.config.JacksonConfigManager; +import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; @@ -40,6 +45,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler; import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.tasklogs.TaskLogStreamer; import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -52,6 +58,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -63,20 +71,6 @@ public class OverlordResource { private static final Logger log = new Logger(OverlordResource.class); - private static Function> simplifyTaskFn = - new Function>() - { - @Override - public Map apply(TaskRunnerWorkItem input) - { - return new ImmutableMap.Builder() - .put("id", input.getTaskId()) - .put("createdTime", input.getCreatedTime()) - .put("queueInsertionTime", input.getQueueInsertionTime()) - .build(); - } - }; - private final TaskMaster taskMaster; private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskLogStreamer taskLogStreamer; @@ -139,6 +133,14 @@ public class OverlordResource ); } + @GET + @Path("/task/{taskid}") + @Produces("application/json") + public Response getTaskPayload(@PathParam("taskid") String taskid) + { + return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid)); + } + @GET @Path("/task/{taskid}/status") @Produces("application/json") @@ -238,39 +240,64 @@ public class OverlordResource } @GET - @Path("/pendingTasks") + @Path("/waitingTasks") @Produces("application/json") - public Response getPendingTasks( - @QueryParam("full") String full - ) + public Response getWaitingTasks() { - if (full != null) { - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - return Response.ok(taskRunner.getPendingTasks()).build(); - } - } - ); - } - - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() + return workItemsResponse( + new Function>() { @Override - public Response apply(TaskRunner taskRunner) + public Collection apply(TaskRunner taskRunner) { - return Response.ok( - Collections2.transform( - taskRunner.getPendingTasks(), - simplifyTaskFn + // A bit roundabout, but works as a way of figuring out what tasks haven't been handed + // off to the runner yet: + final List activeTasks = taskStorageQueryAdapter.getActiveTasks(); + final Set runnersKnownTasks = Sets.newHashSet( + Iterables.transform( + taskRunner.getKnownTasks(), + new Function() + { + @Override + public String apply(final TaskRunnerWorkItem workItem) + { + return workItem.getTaskId(); + } + } ) - ).build(); + ); + final List waitingTasks = Lists.newArrayList(); + for (final Task task : activeTasks) { + if (!runnersKnownTasks.contains(task.getId())) { + waitingTasks.add( + // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. + new TaskRunnerWorkItem( + task.getId(), + SettableFuture.create(), + new DateTime(0), + new DateTime(0) + ) + ); + } + } + return waitingTasks; + } + } + ); + } + + @GET + @Path("/pendingTasks") + @Produces("application/json") + public Response getPendingTasks() + { + return workItemsResponse( + new Function>() + { + @Override + public Collection apply(TaskRunner taskRunner) + { + return taskRunner.getPendingTasks(); } } ); @@ -279,42 +306,45 @@ public class OverlordResource @GET @Path("/runningTasks") @Produces("application/json") - public Response getRunningTasks( - @QueryParam("full") String full - ) + public Response getRunningTasks() { - if (full != null) { - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() - { - @Override - public Response apply(TaskRunner taskRunner) - { - return Response.ok(taskRunner.getRunningTasks()).build(); - } - } - ); - } - - return asLeaderWith( - taskMaster.getTaskRunner(), - new Function() + return workItemsResponse( + new Function>() { @Override - public Response apply(TaskRunner taskRunner) + public Collection apply(TaskRunner taskRunner) { - return Response.ok( - Collections2.transform( - taskRunner.getRunningTasks(), - simplifyTaskFn - ) - ).build(); + return taskRunner.getRunningTasks(); } } ); } + @GET + @Path("/completeTasks") + @Produces("application/json") + public Response getCompleteTasks() + { + final List completeTasks = Lists.transform( + taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(), + new Function() + { + @Override + public TaskResponseObject apply(TaskStatus taskStatus) + { + // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. + return new TaskResponseObject( + taskStatus.getId(), + new DateTime(0), + new DateTime(0), + Optional.of(taskStatus) + ); + } + } + ); + return Response.ok(completeTasks).build(); + } + @GET @Path("/workers") @Produces("application/json") @@ -338,17 +368,13 @@ public class OverlordResource @Produces("application/json") public Response getScalingState() { - return asLeaderWith( - taskMaster.getResourceManagementScheduler(), - new Function() - { - @Override - public Response apply(ResourceManagementScheduler resourceManagementScheduler) - { - return Response.ok(resourceManagementScheduler.getStats()).build(); - } - } - ); + // Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler. + final Optional rms = taskMaster.getResourceManagementScheduler(); + if (rms.isPresent()) { + return Response.ok(rms.get().getStats()).build(); + } else { + return Response.ok().build(); + } } @GET @@ -373,7 +399,39 @@ public class OverlordResource } } - public Response optionalTaskResponse(String taskid, String objectType, Optional x) + private Response workItemsResponse(final Function> fn) + { + return asLeaderWith( + taskMaster.getTaskRunner(), + new Function() + { + @Override + public Response apply(TaskRunner taskRunner) + { + return Response.ok( + Lists.transform( + Lists.newArrayList(fn.apply(taskRunner)), + new Function() + { + @Override + public TaskResponseObject apply(TaskRunnerWorkItem workItem) + { + return new TaskResponseObject( + workItem.getTaskId(), + workItem.getCreatedTime(), + workItem.getQueueInsertionTime(), + Optional.absent() + ); + } + } + ) + ).build(); + } + } + ); + } + + private Response optionalTaskResponse(String taskid, String objectType, Optional x) { final Map results = Maps.newHashMap(); results.put("task", taskid); @@ -385,7 +443,7 @@ public class OverlordResource } } - public Response asLeaderWith(Optional x, Function f) + private Response asLeaderWith(Optional x, Function f) { if (x.isPresent()) { return f.apply(x.get()); @@ -394,4 +452,62 @@ public class OverlordResource return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); } } + + private static class TaskResponseObject + { + private final String id; + private final DateTime createdTime; + private final DateTime queueInsertionTime; + private final Optional status; + + private TaskResponseObject( + String id, + DateTime createdTime, + DateTime queueInsertionTime, + Optional status + ) + { + this.id = id; + this.createdTime = createdTime; + this.queueInsertionTime = queueInsertionTime; + this.status = status; + } + + public String getId() + { + return id; + } + + public DateTime getCreatedTime() + { + return createdTime; + } + + public DateTime getQueueInsertionTime() + { + return queueInsertionTime; + } + + public Optional getStatus() + { + return status; + } + + @JsonValue + public Map toJson() + { + final Map data = Maps.newLinkedHashMap(); + data.put("id", id); + if (createdTime.getMillis() > 0) { + data.put("createdTime", createdTime); + } + if (queueInsertionTime.getMillis() > 0) { + data.put("queueInsertionTime", queueInsertionTime); + } + if (status.isPresent()) { + data.put("statusCode", status.get().getStatusCode().toString()); + } + return data; + } + } } diff --git a/indexing-service/src/main/resources/indexer_static/console.html b/indexing-service/src/main/resources/indexer_static/console.html index 6223eee6a1f..f51383c72c0 100644 --- a/indexing-service/src/main/resources/indexer_static/console.html +++ b/indexing-service/src/main/resources/indexer_static/console.html @@ -43,16 +43,24 @@
Loading Running Tasks... this may take a few minutes
-

Pending Tasks

+

Pending Tasks - Tasks waiting to be assigned to a worker

Loading Pending Tasks... this may take a few minutes
-

Workers

+

Waiting Tasks - Tasks waiting on locks

+
Loading Waiting Tasks... this may take a few minutes
+
+ +

Complete Tasks

+
Loading Complete Tasks... this may take a few minutes
+
+ +

Remote Workers

Loading Workers... this may take a few minutes
-

Event Log

-
Loading Event Log... this may take a few minutes
+

Autoscaling Activity

+
Loading Autoscaling Activities... this may take a few minutes
diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index e3ce86c85c9..adaa1fba83f 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -3,14 +3,39 @@ var oTable = []; $(document).ready(function() { + var augment = function(data) { + for (i = 0 ; i < data.length ; i++) { + var taskId = encodeURIComponent(data[i].id) + data[i].more = + 'payload' + + 'status' + + 'log (all)' + + 'log (last 8kb)' + } + } + $.get('/druid/indexer/v1/runningTasks', function(data) { $('.running_loading').hide(); - buildTable(data, $('#runningTable'), ["segments"]); + augment(data); + buildTable(data, $('#runningTable')); }); $.get('/druid/indexer/v1/pendingTasks', function(data) { $('.pending_loading').hide(); - buildTable(data, $('#pendingTable'), ["segments"]); + augment(data); + buildTable(data, $('#pendingTable')); + }); + + $.get('/druid/indexer/v1/waitingTasks', function(data) { + $('.waiting_loading').hide(); + augment(data); + buildTable(data, $('#waitingTable')); + }); + + $.get('/druid/indexer/v1/completeTasks', function(data) { + $('.complete_loading').hide(); + augment(data); + buildTable(data, $('#completeTable')); }); $.get('/druid/indexer/v1/workers', function(data) { @@ -22,4 +47,4 @@ $(document).ready(function() { $('.events_loading').hide(); buildTable(data, $('#eventTable')); }); -}); \ No newline at end of file +}); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index cc69067d23c..178cae10513 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -32,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec; /** */ @JsonTypeName("test_realtime") -public class TestRealtimeTask extends RealtimeIndexTask implements TestTask +public class TestRealtimeTask extends RealtimeIndexTask { private final TaskStatus status; @@ -64,13 +64,6 @@ public class TestRealtimeTask extends RealtimeIndexTask implements TestTask return "test_realtime"; } - @Override - @JsonProperty - public TaskStatus getStatus() - { - return status; - } - @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index ac90c1e12f0..632384ceb3d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -19,6 +19,7 @@ package io.druid.indexing.overlord; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -53,6 +54,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.KillTask; @@ -74,7 +76,9 @@ import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -115,8 +119,15 @@ public class TaskLifecycleTest tmp = Files.createTempDir(); - final TaskQueueConfig tqc = new DefaultObjectMapper().readValue("{\"startDelay\":\"PT0S\"}", TaskQueueConfig.class); - ts = new HeapMemoryTaskStorage(); + final TaskQueueConfig tqc = new DefaultObjectMapper().readValue( + "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}", + TaskQueueConfig.class + ); + ts = new HeapMemoryTaskStorage( + new TaskStorageConfig() + { + } + ); tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); mdc = newMockMDC(); @@ -289,6 +300,34 @@ public class TaskLifecycleTest Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); } + @Test + public void testNoopTask() throws Exception + { + final Task noopTask = new DefaultObjectMapper().readValue( + "{\"type\":\"noop\", \"runTime\":\"100\"}\"", + Task.class + ); + final TaskStatus status = runTask(noopTask); + + Assert.assertEquals("statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode()); + Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); + Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); + } + + @Test + public void testNeverReadyTask() throws Exception + { + final Task neverReadyTask = new DefaultObjectMapper().readValue( + "{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"", + Task.class + ); + final TaskStatus status = runTask(neverReadyTask); + + Assert.assertEquals("statusCode", TaskStatus.Status.FAILED, status.getStatusCode()); + Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); + Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); + } + @Test public void testSimple() throws Exception { @@ -400,28 +439,41 @@ public class TaskLifecycleTest Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } - private TaskStatus runTask(Task task) + private TaskStatus runTask(final Task task) throws Exception { + final Task dummyTask = new DefaultObjectMapper().readValue( + "{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"", + Task.class + ); final long startTime = System.currentTimeMillis(); + Preconditions.checkArgument(!task.getId().equals(dummyTask.getId())); + + tq.add(dummyTask); tq.add(task); - TaskStatus status; + TaskStatus retVal = null; - try { - while ((status = tsqa.getStatus(task.getId()).get()).isRunnable()) { - if (System.currentTimeMillis() > startTime + 10 * 1000) { - throw new ISE("Where did the task go?!: %s", task.getId()); + for (final String taskId : ImmutableList.of(dummyTask.getId(), task.getId())) { + try { + TaskStatus status; + while ((status = tsqa.getStatus(taskId).get()).isRunnable()) { + if (System.currentTimeMillis() > startTime + 10 * 1000) { + throw new ISE("Where did the task go?!: %s", task.getId()); + } + + Thread.sleep(100); } - - Thread.sleep(100); + if (taskId.equals(task.getId())) { + retVal = status; + } + } + catch (Exception e) { + throw Throwables.propagate(e); } } - catch (Exception e) { - throw Throwables.propagate(e); - } - return status; + return retVal; } private static class MockIndexerDBCoordinator extends IndexerDBCoordinator diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml index f26fbc2a42b..ebeba81d6d5 100644 --- a/kafka-eight/pom.xml +++ b/kafka-eight/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml index 750f2bb9b4e..c7b08ebc463 100644 --- a/kafka-seven/pom.xml +++ b/kafka-seven/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/pom.xml b/pom.xml index c391e4a43e2..f970d49dea5 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ io.druid druid pom - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT druid druid @@ -41,7 +41,7 @@ UTF-8 0.25.1 2.1.0-incubating - 0.1.5 + 0.1.6 diff --git a/processing/pom.xml b/processing/pom.xml index 1ffb41c5e2f..9cf11cd9a6d 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 8710b627ece..316c8d8675e 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -84,6 +84,11 @@ public class ChainedExecutionQueryRunner implements QueryRunner { final int priority = Integer.parseInt(query.getContextValue("priority", "0")); + if (Iterables.isEmpty(queryables)) { + log.warn("No queryables found."); + return Sequences.empty(); + } + return new BaseSequence>( new BaseSequence.IteratorMaker>() { diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml index 14e0d426797..29031970b84 100644 --- a/rabbitmq/pom.xml +++ b/rabbitmq/pom.xml @@ -9,7 +9,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/s3-extensions/pom.xml b/s3-extensions/pom.xml index bcc304c2405..a5b859c7ea7 100644 --- a/s3-extensions/pom.xml +++ b/s3-extensions/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 619ad737cfe..3ae7088d88f 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -21,7 +21,7 @@ package io.druid.storage.s3; import com.google.common.base.Predicate; import com.metamx.common.RetryUtils; -import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; @@ -61,9 +61,9 @@ public class S3Utils { if (e instanceof IOException) { return true; - } else if (e instanceof S3ServiceException) { + } else if (e instanceof ServiceException) { final boolean isIOException = e.getCause() instanceof IOException; - final boolean isTimeout = "RequestTimeout".equals(((S3ServiceException) e).getS3ErrorCode()); + final boolean isTimeout = "RequestTimeout".equals(((ServiceException) e).getErrorCode()); return isIOException || isTimeout; } else { return false; @@ -75,18 +75,18 @@ public class S3Utils } public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey) - throws S3ServiceException + throws ServiceException { try { s3Client.getObjectDetails(new S3Bucket(bucketName), objectKey); } - catch (S3ServiceException e) { + catch (ServiceException e) { if (404 == e.getResponseCode() - || "NoSuchKey".equals(e.getS3ErrorCode()) - || "NoSuchBucket".equals(e.getS3ErrorCode())) { + || "NoSuchKey".equals(e.getErrorCode()) + || "NoSuchBucket".equals(e.getErrorCode())) { return false; } - if ("AccessDenied".equals(e.getS3ErrorCode())) { + if ("AccessDenied".equals(e.getErrorCode())) { // Object is inaccessible to current user, but does exist. return true; } diff --git a/server/pom.xml b/server/pom.xml index 41eb69b15a8..bf05da8aec9 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index f78bc0ac390..df96aa45f5e 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.metamx.common.ISE; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.impl.FileIteratingFirehose; @@ -78,21 +79,26 @@ public class LocalFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - final LinkedList files = Lists.newLinkedList( - Arrays.asList( - baseDir.listFiles( - new FilenameFilter() - { - @Override - public boolean accept(File file, String name) - { - return name.contains(filter); - } - } - ) - ) + File[] foundFiles = baseDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File file, String name) + { + return name.contains(filter); + } + } ); + if (foundFiles == null || foundFiles.length == 0) { + throw new ISE("Found no files to ingest! Check your schema."); + } + + final LinkedList files = Lists.newLinkedList( + Arrays.asList(foundFiles) + ); + + return new FileIteratingFirehose( new Iterator() { diff --git a/server/src/main/java/io/druid/server/StatusResource.java b/server/src/main/java/io/druid/server/StatusResource.java index f2c01050496..d6e5371f0c3 100644 --- a/server/src/main/java/io/druid/server/StatusResource.java +++ b/server/src/main/java/io/druid/server/StatusResource.java @@ -20,16 +20,11 @@ package io.druid.server; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.inject.Injector; -import io.druid.initialization.DruidModule; import io.druid.initialization.Initialization; -import io.druid.server.initialization.ExtensionsConfig; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; -import java.util.ArrayList; -import java.util.List; /** */ @@ -47,52 +42,20 @@ public class StatusResource { return new Status( Initialization.class.getPackage().getImplementationVersion(), - getExtensionVersions(), new Memory(Runtime.getRuntime()) ); } - /** - * Load the unique extensions and return their implementation-versions - * - * @return map of extensions loaded with their respective implementation versions. - */ - private static List getExtensionVersions() - { - final Injector injector = Initialization.makeStartupInjector(); - final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class); - final List druidModules = Initialization.getFromExtensions(config, DruidModule.class); - - List moduleVersions = new ArrayList<>(); - for (DruidModule module : druidModules) { - - String artifact = module.getClass().getPackage().getImplementationTitle(); - String version = module.getClass().getPackage().getImplementationVersion(); - - ModuleVersion moduleVersion; - if (artifact != null) { - moduleVersion = new ModuleVersion(module.getClass().getCanonicalName(), artifact, version); - } else { - moduleVersion = new ModuleVersion(module.getClass().getCanonicalName()); - } - - moduleVersions.add(moduleVersion); - } - return moduleVersions; - } - public static class Status { final String version; - final List modules; final Memory memory; public Status( - String version, List modules, Memory memory + String version, Memory memory ) { this.version = version; - this.modules = modules; this.memory = memory; } @@ -102,12 +65,6 @@ public class StatusResource return version; } - @JsonProperty - public List getModules() - { - return modules; - } - @JsonProperty public Memory getMemory() { @@ -117,20 +74,8 @@ public class StatusResource @Override public String toString() { - final String NL = "\n"; - StringBuilder output = new StringBuilder(); - output.append(String.format("Druid version - %s", version)).append(NL).append(NL); - - if (modules.size() > 0) { - output.append("Registered Druid Modules").append(NL); - } else { - output.append("No Druid Modules loaded !"); - } - - for (ModuleVersion moduleVersion : modules) { - output.append(moduleVersion).append(NL); - } - return output.toString(); + final String NL = System.getProperty("line.separator"); + return String.format("Druid version - %s", version) + NL; } } diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 4cbe042350a..950be651e86 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -116,7 +116,13 @@ public class ServerManager implements QuerySegmentWalker return segmentLoader.isSegmentLoaded(segment); } - public void loadSegment(final DataSegment segment) throws SegmentLoadingException + /** + * Load a single segment. + * @param segment segment to load + * @return true if the segment was newly loaded, false if it was already loaded + * @throws SegmentLoadingException if the segment cannot be loaded + */ + public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException { final Segment adapter; try { @@ -150,8 +156,8 @@ public class ServerManager implements QuerySegmentWalker segment.getVersion() ); if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { - log.info("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); - throw new SegmentLoadingException("Segment already exists[%s]", segment.getIdentifier()); + log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); + return false; } loadedIntervals.add( @@ -165,6 +171,7 @@ public class ServerManager implements QuerySegmentWalker synchronized (dataSourceCounts) { dataSourceCounts.add(dataSource, 1L); } + return true; } } diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index a55341a75a1..246415f57d0 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -230,34 +230,37 @@ public class ZkCoordinator implements DataSegmentChangeHandler try { log.info("Loading segment %s", segment.getIdentifier()); + final boolean loaded; try { - serverManager.loadSegment(segment); + loaded = serverManager.loadSegment(segment); } catch (Exception e) { removeSegment(segment); throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); } - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); + announcer.announceSegment(segment); } catch (IOException e) { - removeSegment(segment); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } } - try { - announcer.announceSegment(segment); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); - } - } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segment for dataSource") @@ -275,8 +278,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler for (DataSegment segment : segments) { log.info("Loading segment %s", segment.getIdentifier()); + final boolean loaded; try { - serverManager.loadSegment(segment); + loaded = serverManager.loadSegment(segment); } catch (Exception e) { log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); @@ -285,20 +289,22 @@ public class ZkCoordinator implements DataSegmentChangeHandler continue; } - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + continue; + } } - catch (IOException e) { - log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); - removeSegment(segment); - segmentFailures.add(segment.getIdentifier()); - continue; - } - } - validSegments.add(segment); + validSegments.add(segment); + } } try { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 993843a0a27..26e736ee3bf 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -46,16 +46,18 @@ import io.druid.client.ServerInventoryView; import io.druid.client.indexing.IndexingServiceClient; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseSegmentManager; import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; import io.druid.segment.IndexIO; +import io.druid.server.DruidNode; import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; import io.druid.server.coordinator.helper.DruidCoordinatorCleanup; import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.server.coordinator.helper.DruidCoordinatorLogger; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; -import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; @@ -106,6 +108,8 @@ public class DruidCoordinator private final LoadQueueTaskMaster taskMaster; private final Map loadManagementPeons; private final AtomicReference leaderLatch; + private final ServiceAnnouncer serviceAnnouncer; + private final DruidNode self; @Inject public DruidCoordinator( @@ -119,7 +123,9 @@ public class DruidCoordinator ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, - LoadQueueTaskMaster taskMaster + LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + @Self DruidNode self ) { this( @@ -134,6 +140,8 @@ public class DruidCoordinator scheduledExecutorFactory, indexingServiceClient, taskMaster, + serviceAnnouncer, + self, Maps.newConcurrentMap() ); } @@ -150,6 +158,8 @@ public class DruidCoordinator ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + DruidNode self, ConcurrentMap loadQueuePeonMap ) { @@ -164,6 +174,8 @@ public class DruidCoordinator this.emitter = emitter; this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; + this.serviceAnnouncer = serviceAnnouncer; + this.self = self; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); @@ -481,6 +493,7 @@ public class DruidCoordinator databaseSegmentManager.start(); databaseRuleManager.start(); serverInventoryView.start(); + serviceAnnouncer.announce(self); final List> coordinatorRunnables = Lists.newArrayList(); dynamicConfigs = configManager.watch( @@ -561,8 +574,10 @@ public class DruidCoordinator } loadManagementPeons.clear(); - databaseSegmentManager.stop(); + serviceAnnouncer.unannounce(self); serverInventoryView.stop(); + databaseRuleManager.stop(); + databaseSegmentManager.stop(); leader = false; } catch (Exception e) { diff --git a/server/src/main/java/io/druid/server/http/BackwardsCompatiableInfoResource.java b/server/src/main/java/io/druid/server/http/BackwardsCompatibleInfoResource.java similarity index 93% rename from server/src/main/java/io/druid/server/http/BackwardsCompatiableInfoResource.java rename to server/src/main/java/io/druid/server/http/BackwardsCompatibleInfoResource.java index 11fe97da9be..ed1cf580887 100644 --- a/server/src/main/java/io/druid/server/http/BackwardsCompatiableInfoResource.java +++ b/server/src/main/java/io/druid/server/http/BackwardsCompatibleInfoResource.java @@ -32,10 +32,10 @@ import javax.ws.rs.Path; /** */ @Path("/static/info") -public class BackwardsCompatiableInfoResource extends InfoResource +public class BackwardsCompatibleInfoResource extends InfoResource { @Inject - public BackwardsCompatiableInfoResource( + public BackwardsCompatibleInfoResource( DruidCoordinator coordinator, InventoryView serverInventoryView, DatabaseSegmentManager databaseSegmentManager, diff --git a/server/src/main/resources/static/index.html b/server/src/main/resources/static/index.html index fb8831e29ed..cec3d620e88 100644 --- a/server/src/main/resources/static/index.html +++ b/server/src/main/resources/static/index.html @@ -32,9 +32,6 @@
-
-

Druid Version: ${pom.version} Druid API Version: ${druid.api.version}

-
diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 8f55a93948c..58323faa863 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,8 +23,10 @@ import com.google.common.collect.MapMaker; import com.metamx.common.concurrent.ScheduledExecutorFactory; import io.druid.client.DruidServer; import io.druid.client.SingleServerInventoryView; +import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.db.DatabaseSegmentManager; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; @@ -111,6 +113,8 @@ public class DruidCoordinatorTest scheduledExecutorFactory, null, taskMaster, + new NoopServiceAnnouncer(), + new DruidNode("hey", "what", 1234), loadManagementPeons ); } diff --git a/services/pom.xml b/services/pom.xml index 9ba368a0b17..306803a9ac7 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -27,7 +27,7 @@ io.druid druid - 0.6.32-SNAPSHOT + 0.6.37-SNAPSHOT diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index b5fc025af24..b099dfa69cf 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -53,7 +53,7 @@ import java.util.List; */ @Command( name = "broker", - description = "Runs a broker node, see http://druid.io/docs/0.6.31/Broker.html for a description" + description = "Runs a broker node, see http://druid.io/docs/0.6.36/Broker.html for a description" ) public class CliBroker extends ServerRunnable { diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index f12d1f893e8..47240c691ee 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -28,7 +28,6 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.indexing.IndexingServiceClient; -import io.druid.curator.discovery.DiscoveryModule; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManagerConfig; import io.druid.db.DatabaseRuleManagerProvider; @@ -41,18 +40,16 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Self; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; -import io.druid.server.http.BackwardsCompatiableInfoResource; +import io.druid.server.http.BackwardsCompatibleInfoResource; import io.druid.server.http.CoordinatorDynamicConfigsResource; import io.druid.server.http.CoordinatorRedirectInfo; import io.druid.server.http.CoordinatorResource; import io.druid.server.http.InfoResource; import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectInfo; -import io.druid.server.http.RedirectServlet; import io.druid.server.initialization.JettyServerInitializer; import org.apache.curator.framework.CuratorFramework; import org.eclipse.jetty.server.Server; @@ -63,7 +60,7 @@ import java.util.List; */ @Command( name = "coordinator", - description = "Runs the Coordinator, see http://druid.io/docs/0.6.31/Coordinator.html for a description." + description = "Runs the Coordinator, see http://druid.io/docs/0.6.36/Coordinator.html for a description." ) public class CliCoordinator extends ServerRunnable { @@ -88,8 +85,8 @@ public class CliCoordinator extends ServerRunnable JsonConfigProvider.bind(binder, "druid.manager.segments", DatabaseSegmentManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class); - binder.bind(RedirectServlet.class).in(LazySingleton.class); binder.bind(RedirectFilter.class).in(LazySingleton.class); + binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); binder.bind(DatabaseSegmentManager.class) .toProvider(DatabaseSegmentManagerProvider.class) @@ -101,15 +98,12 @@ public class CliCoordinator extends ServerRunnable binder.bind(IndexingServiceClient.class).in(LazySingleton.class); - binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); - binder.bind(DruidCoordinator.class); LifecycleModule.register(binder, DruidCoordinator.class); - DiscoveryModule.register(binder, Self.class); binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer()); - Jerseys.addResource(binder, BackwardsCompatiableInfoResource.class); + Jerseys.addResource(binder, BackwardsCompatibleInfoResource.class); Jerseys.addResource(binder, InfoResource.class); Jerseys.addResource(binder, CoordinatorResource.class); Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class); diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java index bb0214ca26f..e873f26c70c 100644 --- a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -41,7 +41,7 @@ import java.util.List; */ @Command( name = "hadoop", - description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.31/Batch-ingestion.html for a description." + description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.36/Batch-ingestion.html for a description." ) public class CliHadoopIndexer implements Runnable { diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 0adf699140e..ca5449100d5 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -42,7 +42,7 @@ import java.util.List; */ @Command( name = "historical", - description = "Runs a Historical node, see http://druid.io/docs/0.6.31/Historical.html for a description" + description = "Runs a Historical node, see http://druid.io/docs/0.6.36/Historical.html for a description" ) public class CliHistorical extends ServerRunnable { diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index b4b2fa4071a..2c258625e4b 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -44,7 +44,7 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; -import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; @@ -95,7 +95,7 @@ import java.util.List; */ @Command( name = "overlord", - description = "Runs an Overlord node, see http://druid.io/docs/0.6.31/Indexing-Service.html for a description" + description = "Runs an Overlord node, see http://druid.io/docs/0.6.36/Indexing-Service.html for a description" ) public class CliOverlord extends ServerRunnable { @@ -120,7 +120,11 @@ public class CliOverlord extends ServerRunnable binder.bind(TaskMaster.class).in(ManageLifecycle.class); binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); - binder.bind(new TypeLiteral>(){}) + binder.bind( + new TypeLiteral>() + { + } + ) .toProvider( new ListProvider() .add(TaskRunnerTaskLogStreamer.class) @@ -154,10 +158,15 @@ public class CliOverlord extends ServerRunnable private void configureTaskStorage(Binder binder) { + JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class); + PolyBind.createChoice( binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class) ); - final MapBinder storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class)); + final MapBinder storageBinder = PolyBind.optionBinder( + binder, + Key.get(TaskStorage.class) + ); storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class); binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class); @@ -176,7 +185,10 @@ public class CliOverlord extends ServerRunnable Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class) ); - final MapBinder biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); + final MapBinder biddy = PolyBind.optionBinder( + binder, + Key.get(TaskRunnerFactory.class) + ); IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); @@ -189,7 +201,9 @@ public class CliOverlord extends ServerRunnable private void configureAutoscale(Binder binder) { JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); - binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class); + binder.bind(ResourceManagementStrategy.class) + .to(SimpleResourceManagementStrategy.class) + .in(LazySingleton.class); JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null); @@ -218,13 +232,17 @@ public class CliOverlord extends ServerRunnable } /** - */ + */ private static class OverlordJettyServerInitializer implements JettyServerInitializer { @Override public void initialize(Server server, Injector injector) { - ResourceHandler resourceHandler = new ResourceHandler(); + final ServletContextHandler redirect = new ServletContextHandler(ServletContextHandler.SESSIONS); + redirect.setContextPath("/"); + redirect.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); + + final ResourceHandler resourceHandler = new ResourceHandler(); resourceHandler.setBaseResource( new ResourceCollection( new String[]{ @@ -238,12 +256,11 @@ public class CliOverlord extends ServerRunnable root.setContextPath("/"); HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); + handlerList.setHandlers(new Handler[]{redirect, resourceHandler, root, new DefaultHandler()}); server.setHandler(handlerList); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); root.addFilter(GzipFilter.class, "/*", null); - root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); root.addFilter(GuiceFilter.class, "/*", null); } } diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java index 89452ed5c23..065db03dba5 100644 --- a/services/src/main/java/io/druid/cli/CliRealtime.java +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -30,7 +30,7 @@ import java.util.List; */ @Command( name = "realtime", - description = "Runs a realtime node, see http://druid.io/docs/0.6.31/Realtime.html for a description" + description = "Runs a realtime node, see http://druid.io/docs/0.6.36/Realtime.html for a description" ) public class CliRealtime extends ServerRunnable { diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index dc7725f575b..819debc339a 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -42,7 +42,7 @@ import java.util.concurrent.Executor; */ @Command( name = "realtime", - description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.31/Realtime.html for a description" + description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.36/Realtime.html for a description" ) public class CliRealtimeExample extends ServerRunnable { diff --git a/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java b/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java index bf5fcd14cc8..12cf906b3cc 100644 --- a/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/CoordinatorJettyServerInitializer.java @@ -36,25 +36,28 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.GzipFilter; /** -*/ + */ class CoordinatorJettyServerInitializer implements JettyServerInitializer { @Override public void initialize(Server server, Injector injector) { - ResourceHandler resourceHandler = new ResourceHandler(); + final ServletContextHandler redirect = new ServletContextHandler(ServletContextHandler.SESSIONS); + redirect.setContextPath("/"); + redirect.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); + + final ResourceHandler resourceHandler = new ResourceHandler(); resourceHandler.setResourceBase(DruidCoordinator.class.getClassLoader().getResource("static").toExternalForm()); final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.setContextPath("/"); HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); + handlerList.setHandlers(new Handler[]{redirect, resourceHandler, root, new DefaultHandler()}); server.setHandler(handlerList); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); root.addFilter(GzipFilter.class, "/*", null); - root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); root.addFilter(GuiceFilter.class, "/*", null); } } diff --git a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java index 3cc867700d0..fdbf2c9f2f0 100644 --- a/services/src/main/java/io/druid/cli/convert/ConvertProperties.java +++ b/services/src/main/java/io/druid/cli/convert/ConvertProperties.java @@ -178,7 +178,7 @@ public class ConvertProperties implements Runnable } updatedProps.setProperty( - "druid.monitoring.monitors", "[\"io.druid.server.metrics.ServerMonitor\", \"com.metamx.metrics.SysMonitor\"]" + "druid.monitoring.monitors", "[\"com.metamx.metrics.SysMonitor\"]" ); BufferedWriter out = null;