Merge branch 'master' into az

Conflicts:
	server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java
This commit is contained in:
fjy 2013-12-16 14:57:54 -08:00
commit e309f48357
61 changed files with 713 additions and 315 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " " echo " "
ls -1 examples/*/*sh ls -1 examples/*/*sh
echo " " echo " "
echo "See also http://druid.io/docs/0.6.31" echo "See also http://druid.io/docs/0.6.36"

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -3,7 +3,7 @@ layout: doc_page
--- ---
# Booting a Single Node Cluster # # Booting a Single Node Cluster #
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.31-bin.tar.gz). [Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.36-bin.tar.gz).
The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables: The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables:

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid git clone https://github.com/metamx/druid.git druid
cd druid cd druid
git fetch --tags git fetch --tags
git checkout druid-0.6.31 git checkout druid-0.6.36
./build.sh ./build.sh
``` ```
### Downloading the DSK (Druid Standalone Kit) ### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz) a stand-alone tarball and run it: [Download](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz) a stand-alone tarball and run it:
``` bash ``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -56,6 +56,7 @@ With the following JVM configuration:
-Ddruid.db.connector.password=diurd -Ddruid.db.connector.password=diurd
-Ddruid.selectors.indexing.serviceName=overlord -Ddruid.selectors.indexing.serviceName=overlord
-Ddruid.indexer.queue.startDelay=PT0M
-Ddruid.indexer.runner.javaOpts="-server -Xmx1g" -Ddruid.indexer.runner.javaOpts="-server -Xmx1g"
-Ddruid.indexer.runner.startPort=8081 -Ddruid.indexer.runner.startPort=8081
-Ddruid.indexer.fork.property.druid.computation.buffer.size=268435456 -Ddruid.indexer.fork.property.druid.computation.buffer.size=268435456
@ -116,6 +117,7 @@ In addition to the configuration of some of the default modules in [Configuratio
|--------|-----------|-------| |--------|-----------|-------|
|`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment.|local| |`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment.|local|
|`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be resumed if the overlord should fail.|local| |`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be resumed if the overlord should fail.|local|
|`druid.indexer.storage.recentlyFinishedThreshold`|A duration of time to store task results.|PT24H|
|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE| |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE|
|`druid.indexer.queue.startDelay`|Sleep this long before starting overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M| |`druid.indexer.queue.startDelay`|Sleep this long before starting overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M|
|`druid.indexer.queue.restartDelay`|Sleep this long when overlord queue management throws an exception before trying again.|PT30S| |`druid.indexer.queue.restartDelay`|Sleep this long when overlord queue management throws an exception before trying again.|PT30S|

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime druid.service=realtime
druid.port=8083 druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.31"] druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.36"]
druid.zk.service.host=localhost druid.zk.service.host=localhost

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball ### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz). Download this file to a directory of your choosing. We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.31 cd druid-services-0.6.36
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -94,6 +94,7 @@ druid.db.connector.user=druid
druid.db.connector.password=diurd druid.db.connector.password=diurd
druid.selectors.indexing.serviceName=overlord druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx1g" druid.indexer.runner.javaOpts="-server -Xmx1g"
druid.indexer.runner.startPort=8088 druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.computation.buffer.size=268435456 druid.indexer.fork.property.druid.computation.buffer.size=268435456
@ -246,17 +247,19 @@ Issuing a [TimeBoundaryQuery](TimeBoundaryQuery.html) should yield:
} ] } ]
``` ```
Problems? Console
--------- --------
If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can read the individual task logs at: The indexing service overlord has a console located at:
```bash ```bash
<Current working directory>/log/<task_id>.log localhost:8087/console.html
``` ```
One thing to note is that the log file will only exist once the task completes with either SUCCESS or FAILURE. On this console, you can look at statuses and logs of recently submitted and completed tasks.
If you decide to reuse the local firehose to ingest your own data and if you run into problems, you can use the console to read the individual task logs.
Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html). More information about how to configure this is [here](Configuration.html). Task logs can be stored locally or uploaded to [Deep Storage](Deep-Storage.html). More information about how to configure this is [here](Configuration.html).
Most common data ingestion problems are around timestamp formats and other malformed data issues. Most common data ingestion problems are around timestamp formats and other malformed data issues.

View File

@ -44,7 +44,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
#### Setting up Kafka #### Setting up Kafka
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.31/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node. [KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.36/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html). Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz) You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz)
and untar the contents within by issuing: and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.31"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.36"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -238,7 +238,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.31","io.druid.extensions:druid-kafka-seven:0.6.31"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.36","io.druid.extensions:druid-kafka-seven:0.6.36"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop
@ -253,5 +253,5 @@ druid.processing.buffer.sizeBytes=10000000
Next Steps Next Steps
---------- ----------
If you are intested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data? If you are interested in how data flows through the different Druid components, check out the [Druid data flow architecture](Design.html). Now that you have an understanding of what the Druid cluster looks like, why not load some of your own data?
Check out the next [tutorial](Tutorial%3A-Loading-Your-Data-Part-1.html) section for more info! Check out the next [tutorial](Tutorial%3A-Loading-Your-Data-Part-1.html) section for more info!

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz) We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz)
Download this file to a directory of your choosing. Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.31 cd druid-services-0.6.36
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.31-bin.tar.gz. We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz.
Download this bad boy to a directory of your choosing. Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.31"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.36"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -9,6 +9,7 @@ druid.db.connector.user=druid
druid.db.connector.password=diurd druid.db.connector.password=diurd
druid.selectors.indexing.serviceName=overlord druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx1g" druid.indexer.runner.javaOpts="-server -Xmx1g"
druid.indexer.runner.startPort=8088 druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.computation.buffer.size=268435456 druid.indexer.fork.property.druid.computation.buffer.size=268435456

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.31","io.druid.extensions:druid-kafka-seven:0.6.31","io.druid.extensions:druid-rabbitmq:0.6.31"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.36","io.druid.extensions:druid-kafka-seven:0.6.36","io.druid.extensions:druid-rabbitmq:0.6.36"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -0,0 +1,19 @@
package io.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.joda.time.Period;
import javax.validation.constraints.NotNull;
public class TaskStorageConfig
{
@JsonProperty
@NotNull
public Duration recentlyFinishedThreshold = new Period("PT24H").toStandardDuration();
public Duration getRecentlyFinishedThreshold()
{
return recentlyFinishedThreshold;
}
}

View File

@ -20,7 +20,9 @@
package io.druid.indexing.common.task; package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
@ -34,14 +36,34 @@ public class NoopTask extends AbstractTask
{ {
private static final Logger log = new Logger(NoopTask.class); private static final Logger log = new Logger(NoopTask.class);
private static int defaultRunTime = 2500; private static int defaultRunTime = 2500;
private static int defaultIsReadyTime = 0;
private static IsReadyResult defaultIsReadyResult = IsReadyResult.YES;
private final int runTime; enum IsReadyResult
{
YES,
NO,
EXCEPTION
}
@JsonIgnore
private final long runTime;
@JsonIgnore
private final long isReadyTime;
@JsonIgnore
private final IsReadyResult isReadyResult;
@JsonIgnore
private final FirehoseFactory firehoseFactory; private final FirehoseFactory firehoseFactory;
@JsonCreator @JsonCreator
public NoopTask( public NoopTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("runTime") int runTime, @JsonProperty("runTime") long runTime,
@JsonProperty("isReadyTime") long isReadyTime,
@JsonProperty("isReadyResult") String isReadyResult,
@JsonProperty("firehose") FirehoseFactory firehoseFactory @JsonProperty("firehose") FirehoseFactory firehoseFactory
) )
{ {
@ -51,6 +73,10 @@ public class NoopTask extends AbstractTask
); );
this.runTime = (runTime == 0) ? defaultRunTime : runTime; this.runTime = (runTime == 0) ? defaultRunTime : runTime;
this.isReadyTime = (isReadyTime == 0) ? defaultIsReadyTime : isReadyTime;
this.isReadyResult = (isReadyResult == null)
? defaultIsReadyResult
: IsReadyResult.valueOf(isReadyResult.toUpperCase());
this.firehoseFactory = firehoseFactory; this.firehoseFactory = firehoseFactory;
} }
@ -60,12 +86,24 @@ public class NoopTask extends AbstractTask
return "noop"; return "noop";
} }
@JsonProperty("runTime") @JsonProperty
public int getRunTime() public long getRunTime()
{ {
return runTime; return runTime;
} }
@JsonProperty
public long getIsReadyTime()
{
return isReadyTime;
}
@JsonProperty
public IsReadyResult getIsReadyResult()
{
return isReadyResult;
}
@JsonProperty("firehose") @JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory() public FirehoseFactory getFirehoseFactory()
{ {
@ -75,7 +113,16 @@ public class NoopTask extends AbstractTask
@Override @Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception public boolean isReady(TaskActionClient taskActionClient) throws Exception
{ {
return true; switch (isReadyResult) {
case YES:
return true;
case NO:
return false;
case EXCEPTION:
throw new ISE("Not ready. Never will be ready. Go away!");
default:
throw new AssertionError("#notreached");
}
} }
@Override @Override

View File

@ -34,15 +34,16 @@ import com.metamx.common.RetryUtils;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.mysql.jdbc.exceptions.MySQLTimeoutException;
import com.mysql.jdbc.exceptions.MySQLTransientException; import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.db.DbConnector; import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig; import io.druid.db.DbTablesConfig;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Period;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.CallbackFailedException;
@ -64,16 +65,24 @@ public class DbTaskStorage implements TaskStorage
private final DbConnector dbConnector; private final DbConnector dbConnector;
private final DbTablesConfig dbTables; private final DbTablesConfig dbTables;
private final IDBI dbi; private final IDBI dbi;
private final TaskStorageConfig config;
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
@Inject @Inject
public DbTaskStorage(ObjectMapper jsonMapper, DbConnector dbConnector, DbTablesConfig dbTables, IDBI dbi) public DbTaskStorage(
final ObjectMapper jsonMapper,
final DbConnector dbConnector,
final DbTablesConfig dbTables,
final IDBI dbi,
final TaskStorageConfig config
)
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.dbConnector = dbConnector; this.dbConnector = dbConnector;
this.dbTables = dbTables; this.dbTables = dbTables;
this.dbi = dbi; this.dbi = dbi;
this.config = config;
} }
@LifecycleStart @LifecycleStart
@ -271,6 +280,45 @@ public class DbTaskStorage implements TaskStorage
); );
} }
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold());
return retryingHandle(
new HandleCallback<List<TaskStatus>>()
{
@Override
public List<TaskStatus> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC",
dbTables.getTasksTable()
)
).bind("recent", recent.toString()).list();
final ImmutableList.Builder<TaskStatus> statuses = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isComplete()) {
statuses.add(status);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
}
}
return statuses.build();
}
}
);
}
@Override @Override
public void addLock(final String taskid, final TaskLock taskLock) public void addLock(final String taskid, final TaskLock taskLock)
{ {
@ -407,7 +455,8 @@ public class DbTaskStorage implements TaskStorage
for (final Map<String, Object> dbTaskLog : dbTaskLogs) { for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try { try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class)); retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
} catch (Exception e) { }
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog") log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid) .addData("task", taskid)
.addData("logPayload", dbTaskLog) .addData("logPayload", dbTaskLog)
@ -451,7 +500,8 @@ public class DbTaskStorage implements TaskStorage
/** /**
* Retry SQL operations * Retry SQL operations
*/ */
private <T> T retryingHandle(final HandleCallback<T> callback) { private <T> T retryingHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>() final Callable<T> call = new Callable<T>()
{ {
@Override @Override
@ -471,9 +521,11 @@ public class DbTaskStorage implements TaskStorage
final int maxTries = 10; final int maxTries = 10;
try { try {
return RetryUtils.retry(call, shouldRetry, maxTries); return RetryUtils.retry(call, shouldRetry, maxTries);
} catch (RuntimeException e) { }
catch (RuntimeException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} catch (Exception e) { }
catch (Exception e) {
throw new CallbackFailedException(e); throw new CallbackFailedException(e);
} }
} }

View File

@ -391,7 +391,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
if (offset > 0) { if (offset > 0) {
raf.seek(offset); raf.seek(offset);
} else if (offset < 0 && offset < rafLength) { } else if (offset < 0 && offset < rafLength) {
raf.seek(rafLength + offset); raf.seek(Math.max(0, rafLength + offset));
} }
return Channels.newInputStream(raf.getChannel()); return Channels.newInputStream(raf.getChannel());
} }

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.api.client.util.Lists;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
@ -26,11 +27,15 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -42,6 +47,8 @@ import java.util.concurrent.locks.ReentrantLock;
*/ */
public class HeapMemoryTaskStorage implements TaskStorage public class HeapMemoryTaskStorage implements TaskStorage
{ {
private final TaskStorageConfig config;
private final ReentrantLock giant = new ReentrantLock(); private final ReentrantLock giant = new ReentrantLock();
private final Map<String, TaskStuff> tasks = Maps.newHashMap(); private final Map<String, TaskStuff> tasks = Maps.newHashMap();
private final Multimap<String, TaskLock> taskLocks = HashMultimap.create(); private final Multimap<String, TaskLock> taskLocks = HashMultimap.create();
@ -49,6 +56,12 @@ public class HeapMemoryTaskStorage implements TaskStorage
private static final Logger log = new Logger(HeapMemoryTaskStorage.class); private static final Logger log = new Logger(HeapMemoryTaskStorage.class);
@Inject
public HeapMemoryTaskStorage(TaskStorageConfig config)
{
this.config = config;
}
@Override @Override
public void insert(Task task, TaskStatus status) public void insert(Task task, TaskStatus status)
{ {
@ -69,7 +82,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
} }
log.info("Inserting task %s with status: %s", task.getId(), status); log.info("Inserting task %s with status: %s", task.getId(), status);
tasks.put(task.getId(), new TaskStuff(task, status)); tasks.put(task.getId(), new TaskStuff(task, status, new DateTime()));
} finally { } finally {
giant.unlock(); giant.unlock();
} }
@ -139,13 +152,39 @@ public class HeapMemoryTaskStorage implements TaskStorage
listBuilder.add(taskStuff.getTask()); listBuilder.add(taskStuff.getTask());
} }
} }
return listBuilder.build(); return listBuilder.build();
} finally { } finally {
giant.unlock(); giant.unlock();
} }
} }
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
giant.lock();
try {
final List<TaskStatus> returns = Lists.newArrayList();
final long recent = System.currentTimeMillis() - config.getRecentlyFinishedThreshold().getMillis();
final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
{
@Override
public int compare(TaskStuff a, TaskStuff b)
{
return a.getCreatedDate().compareTo(b.getCreatedDate());
}
}.reverse();
for(final TaskStuff taskStuff : createdDateDesc.sortedCopy(tasks.values())) {
if(taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().getMillis() > recent) {
returns.add(taskStuff.getStatus());
}
}
return returns;
} finally {
giant.unlock();
}
}
@Override @Override
public void addLock(final String taskid, final TaskLock taskLock) public void addLock(final String taskid, final TaskLock taskLock)
{ {
@ -212,8 +251,9 @@ public class HeapMemoryTaskStorage implements TaskStorage
{ {
final Task task; final Task task;
final TaskStatus status; final TaskStatus status;
final DateTime createdDate;
private TaskStuff(Task task, TaskStatus status) private TaskStuff(Task task, TaskStatus status, DateTime createdDate)
{ {
Preconditions.checkNotNull(task); Preconditions.checkNotNull(task);
Preconditions.checkNotNull(status); Preconditions.checkNotNull(status);
@ -221,6 +261,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
this.task = task; this.task = task;
this.status = status; this.status = status;
this.createdDate = Preconditions.checkNotNull(createdDate, "createdDate");
} }
public Task getTask() public Task getTask()
@ -233,9 +274,14 @@ public class HeapMemoryTaskStorage implements TaskStorage
return status; return status;
} }
public DateTime getCreatedDate()
{
return createdDate;
}
private TaskStuff withStatus(TaskStatus _status) private TaskStuff withStatus(TaskStatus _status)
{ {
return new TaskStuff(task, _status); return new TaskStuff(task, _status, createdDate);
} }
} }
} }

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -224,7 +225,8 @@ public class TaskQueue
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
} }
// Attain futures for all active tasks (assuming they are ready to run). // Attain futures for all active tasks (assuming they are ready to run).
for (final Task task : tasks) { // Copy tasks list, as notifyStatus may modify it.
for (final Task task : ImmutableList.copyOf(tasks)) {
if (!taskFutures.containsKey(task.getId())) { if (!taskFutures.containsKey(task.getId())) {
final ListenableFuture<TaskStatus> runnerTaskFuture; final ListenableFuture<TaskStatus> runnerTaskFuture;
if (runnerTaskFutures.containsKey(task.getId())) { if (runnerTaskFutures.containsKey(task.getId())) {
@ -236,7 +238,7 @@ public class TaskQueue
taskIsReady = task.isReady(taskActionClientFactory.create(task)); taskIsReady = task.isReady(taskActionClientFactory.create(task));
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Exception thrown during isReady").addData("task", task.getId()).emit(); log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
notifyStatus(task, TaskStatus.failure(task.getId())); notifyStatus(task, TaskStatus.failure(task.getId()));
continue; continue;
} }

View File

@ -19,6 +19,8 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
@ -56,21 +58,25 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
this.queueInsertionTime = queueInsertionTime; this.queueInsertionTime = queueInsertionTime;
} }
@JsonProperty
public String getTaskId() public String getTaskId()
{ {
return taskId; return taskId;
} }
@JsonIgnore
public ListenableFuture<TaskStatus> getResult() public ListenableFuture<TaskStatus> getResult()
{ {
return result; return result;
} }
@JsonProperty
public DateTime getCreatedTime() public DateTime getCreatedTime()
{ {
return createdTime; return createdTime;
} }
@JsonProperty
public DateTime getQueueInsertionTime() public DateTime getQueueInsertionTime()
{ {
return queueInsertionTime; return queueInsertionTime;

View File

@ -82,6 +82,13 @@ public interface TaskStorage
*/ */
public List<Task> getActiveTasks(); public List<Task> getActiveTasks();
/**
* Returns a list of recently finished task statuses as stored in the storage facility. No particular order
* is guaranteed, but implementations are encouraged to return tasks in descending order of creation. No particular
* standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing.
*/
public List<TaskStatus> getRecentlyFinishedTaskStatuses();
/** /**
* Returns a list of locks for a particular task. * Returns a list of locks for a particular task.
*/ */

View File

@ -19,14 +19,19 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -42,6 +47,21 @@ public class TaskStorageQueryAdapter
this.storage = storage; this.storage = storage;
} }
public List<Task> getActiveTasks()
{
return storage.getActiveTasks();
}
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
return storage.getRecentlyFinishedTaskStatuses();
}
public Optional<Task> getTask(final String taskid)
{
return storage.getTask(taskid);
}
public Optional<TaskStatus> getStatus(final String taskid) public Optional<TaskStatus> getStatus(final String taskid)
{ {
return storage.getStatus(taskid); return storage.getStatus(taskid);

View File

@ -19,15 +19,20 @@
package io.druid.indexing.overlord.http; package io.druid.indexing.overlord.http;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.common.config.JacksonConfigManager; import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
@ -40,6 +45,7 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData; import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue; import javax.ws.rs.DefaultValue;
@ -52,6 +58,8 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -63,20 +71,6 @@ public class OverlordResource
{ {
private static final Logger log = new Logger(OverlordResource.class); private static final Logger log = new Logger(OverlordResource.class);
private static Function<TaskRunnerWorkItem, Map<String, Object>> simplifyTaskFn =
new Function<TaskRunnerWorkItem, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(TaskRunnerWorkItem input)
{
return new ImmutableMap.Builder<String, Object>()
.put("id", input.getTaskId())
.put("createdTime", input.getCreatedTime())
.put("queueInsertionTime", input.getQueueInsertionTime())
.build();
}
};
private final TaskMaster taskMaster; private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter; private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogStreamer taskLogStreamer; private final TaskLogStreamer taskLogStreamer;
@ -139,6 +133,14 @@ public class OverlordResource
); );
} }
@GET
@Path("/task/{taskid}")
@Produces("application/json")
public Response getTaskPayload(@PathParam("taskid") String taskid)
{
return optionalTaskResponse(taskid, "payload", taskStorageQueryAdapter.getTask(taskid));
}
@GET @GET
@Path("/task/{taskid}/status") @Path("/task/{taskid}/status")
@Produces("application/json") @Produces("application/json")
@ -238,39 +240,64 @@ public class OverlordResource
} }
@GET @GET
@Path("/pendingTasks") @Path("/waitingTasks")
@Produces("application/json") @Produces("application/json")
public Response getPendingTasks( public Response getWaitingTasks()
@QueryParam("full") String full
)
{ {
if (full != null) { return workItemsResponse(
return asLeaderWith( new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(taskRunner.getPendingTasks()).build();
}
}
);
}
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{ {
@Override @Override
public Response apply(TaskRunner taskRunner) public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{ {
return Response.ok( // A bit roundabout, but works as a way of figuring out what tasks haven't been handed
Collections2.transform( // off to the runner yet:
taskRunner.getPendingTasks(), final List<Task> activeTasks = taskStorageQueryAdapter.getActiveTasks();
simplifyTaskFn final Set<String> runnersKnownTasks = Sets.newHashSet(
Iterables.transform(
taskRunner.getKnownTasks(),
new Function<TaskRunnerWorkItem, String>()
{
@Override
public String apply(final TaskRunnerWorkItem workItem)
{
return workItem.getTaskId();
}
}
) )
).build(); );
final List<TaskRunnerWorkItem> waitingTasks = Lists.newArrayList();
for (final Task task : activeTasks) {
if (!runnersKnownTasks.contains(task.getId())) {
waitingTasks.add(
// Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it.
new TaskRunnerWorkItem(
task.getId(),
SettableFuture.<TaskStatus>create(),
new DateTime(0),
new DateTime(0)
)
);
}
}
return waitingTasks;
}
}
);
}
@GET
@Path("/pendingTasks")
@Produces("application/json")
public Response getPendingTasks()
{
return workItemsResponse(
new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
{
@Override
public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{
return taskRunner.getPendingTasks();
} }
} }
); );
@ -279,42 +306,45 @@ public class OverlordResource
@GET @GET
@Path("/runningTasks") @Path("/runningTasks")
@Produces("application/json") @Produces("application/json")
public Response getRunningTasks( public Response getRunningTasks()
@QueryParam("full") String full
)
{ {
if (full != null) { return workItemsResponse(
return asLeaderWith( new Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>>()
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(taskRunner.getRunningTasks()).build();
}
}
);
}
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{ {
@Override @Override
public Response apply(TaskRunner taskRunner) public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
{ {
return Response.ok( return taskRunner.getRunningTasks();
Collections2.transform(
taskRunner.getRunningTasks(),
simplifyTaskFn
)
).build();
} }
} }
); );
} }
@GET
@Path("/completeTasks")
@Produces("application/json")
public Response getCompleteTasks()
{
final List<TaskResponseObject> completeTasks = Lists.transform(
taskStorageQueryAdapter.getRecentlyFinishedTaskStatuses(),
new Function<TaskStatus, TaskResponseObject>()
{
@Override
public TaskResponseObject apply(TaskStatus taskStatus)
{
// Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it.
return new TaskResponseObject(
taskStatus.getId(),
new DateTime(0),
new DateTime(0),
Optional.of(taskStatus)
);
}
}
);
return Response.ok(completeTasks).build();
}
@GET @GET
@Path("/workers") @Path("/workers")
@Produces("application/json") @Produces("application/json")
@ -338,17 +368,13 @@ public class OverlordResource
@Produces("application/json") @Produces("application/json")
public Response getScalingState() public Response getScalingState()
{ {
return asLeaderWith( // Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler.
taskMaster.getResourceManagementScheduler(), final Optional<ResourceManagementScheduler> rms = taskMaster.getResourceManagementScheduler();
new Function<ResourceManagementScheduler, Response>() if (rms.isPresent()) {
{ return Response.ok(rms.get().getStats()).build();
@Override } else {
public Response apply(ResourceManagementScheduler resourceManagementScheduler) return Response.ok().build();
{ }
return Response.ok(resourceManagementScheduler.getStats()).build();
}
}
);
} }
@GET @GET
@ -373,7 +399,39 @@ public class OverlordResource
} }
} }
public <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x) private Response workItemsResponse(final Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>> fn)
{
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
{
return Response.ok(
Lists.transform(
Lists.newArrayList(fn.apply(taskRunner)),
new Function<TaskRunnerWorkItem, TaskResponseObject>()
{
@Override
public TaskResponseObject apply(TaskRunnerWorkItem workItem)
{
return new TaskResponseObject(
workItem.getTaskId(),
workItem.getCreatedTime(),
workItem.getQueueInsertionTime(),
Optional.<TaskStatus>absent()
);
}
}
)
).build();
}
}
);
}
private <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x)
{ {
final Map<String, Object> results = Maps.newHashMap(); final Map<String, Object> results = Maps.newHashMap();
results.put("task", taskid); results.put("task", taskid);
@ -385,7 +443,7 @@ public class OverlordResource
} }
} }
public <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f) private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
{ {
if (x.isPresent()) { if (x.isPresent()) {
return f.apply(x.get()); return f.apply(x.get());
@ -394,4 +452,62 @@ public class OverlordResource
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
} }
} }
private static class TaskResponseObject
{
private final String id;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final Optional<TaskStatus> status;
private TaskResponseObject(
String id,
DateTime createdTime,
DateTime queueInsertionTime,
Optional<TaskStatus> status
)
{
this.id = id;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
this.status = status;
}
public String getId()
{
return id;
}
public DateTime getCreatedTime()
{
return createdTime;
}
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;
}
public Optional<TaskStatus> getStatus()
{
return status;
}
@JsonValue
public Map<String, Object> toJson()
{
final Map<String, Object> data = Maps.newLinkedHashMap();
data.put("id", id);
if (createdTime.getMillis() > 0) {
data.put("createdTime", createdTime);
}
if (queueInsertionTime.getMillis() > 0) {
data.put("queueInsertionTime", queueInsertionTime);
}
if (status.isPresent()) {
data.put("statusCode", status.get().getStatusCode().toString());
}
return data;
}
}
} }

View File

@ -43,16 +43,24 @@
<div class="running_loading">Loading Running Tasks... this may take a few minutes</div> <div class="running_loading">Loading Running Tasks... this may take a few minutes</div>
<table id="runningTable"></table> <table id="runningTable"></table>
<h2>Pending Tasks</h2> <h2>Pending Tasks - Tasks waiting to be assigned to a worker</h2>
<div class="pending_loading">Loading Pending Tasks... this may take a few minutes</div> <div class="pending_loading">Loading Pending Tasks... this may take a few minutes</div>
<table id="pendingTable"></table> <table id="pendingTable"></table>
<h2>Workers</h2> <h2>Waiting Tasks - Tasks waiting on locks</h2>
<div class="waiting_loading">Loading Waiting Tasks... this may take a few minutes</div>
<table id="waitingTable"></table>
<h2>Complete Tasks</h2>
<div class="complete_loading">Loading Complete Tasks... this may take a few minutes</div>
<table id="completeTable"></table>
<h2>Remote Workers</h2>
<div class="workers_loading">Loading Workers... this may take a few minutes</div> <div class="workers_loading">Loading Workers... this may take a few minutes</div>
<table id="workerTable"></table> <table id="workerTable"></table>
<h2>Event Log</h2> <h2>Autoscaling Activity</h2>
<div class="events_loading">Loading Event Log... this may take a few minutes</div> <div class="events_loading">Loading Autoscaling Activities... this may take a few minutes</div>
<table id="eventTable"></table> <table id="eventTable"></table>
</div> </div>
</body> </body>

View File

@ -3,14 +3,39 @@
var oTable = []; var oTable = [];
$(document).ready(function() { $(document).ready(function() {
var augment = function(data) {
for (i = 0 ; i < data.length ; i++) {
var taskId = encodeURIComponent(data[i].id)
data[i].more =
'<a href="/druid/indexer/v1/task/' + taskId + '">payload</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/status">status</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/log">log (all)</a>' +
'<a href="/druid/indexer/v1/task/' + taskId + '/log?offset=-8192">log (last 8kb)</a>'
}
}
$.get('/druid/indexer/v1/runningTasks', function(data) { $.get('/druid/indexer/v1/runningTasks', function(data) {
$('.running_loading').hide(); $('.running_loading').hide();
buildTable(data, $('#runningTable'), ["segments"]); augment(data);
buildTable(data, $('#runningTable'));
}); });
$.get('/druid/indexer/v1/pendingTasks', function(data) { $.get('/druid/indexer/v1/pendingTasks', function(data) {
$('.pending_loading').hide(); $('.pending_loading').hide();
buildTable(data, $('#pendingTable'), ["segments"]); augment(data);
buildTable(data, $('#pendingTable'));
});
$.get('/druid/indexer/v1/waitingTasks', function(data) {
$('.waiting_loading').hide();
augment(data);
buildTable(data, $('#waitingTable'));
});
$.get('/druid/indexer/v1/completeTasks', function(data) {
$('.complete_loading').hide();
augment(data);
buildTable(data, $('#completeTable'));
}); });
$.get('/druid/indexer/v1/workers', function(data) { $.get('/druid/indexer/v1/workers', function(data) {

View File

@ -32,7 +32,7 @@ import io.druid.timeline.partition.NoneShardSpec;
/** /**
*/ */
@JsonTypeName("test_realtime") @JsonTypeName("test_realtime")
public class TestRealtimeTask extends RealtimeIndexTask implements TestTask public class TestRealtimeTask extends RealtimeIndexTask
{ {
private final TaskStatus status; private final TaskStatus status;
@ -64,13 +64,6 @@ public class TestRealtimeTask extends RealtimeIndexTask implements TestTask
return "test_realtime"; return "test_realtime";
} }
@Override
@JsonProperty
public TaskStatus getStatus()
{
return status;
}
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -53,6 +54,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.KillTask;
@ -74,7 +76,9 @@ import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -115,8 +119,15 @@ public class TaskLifecycleTest
tmp = Files.createTempDir(); tmp = Files.createTempDir();
final TaskQueueConfig tqc = new DefaultObjectMapper().readValue("{\"startDelay\":\"PT0S\"}", TaskQueueConfig.class); final TaskQueueConfig tqc = new DefaultObjectMapper().readValue(
ts = new HeapMemoryTaskStorage(); "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}",
TaskQueueConfig.class
);
ts = new HeapMemoryTaskStorage(
new TaskStorageConfig()
{
}
);
tsqa = new TaskStorageQueryAdapter(ts); tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts); tl = new TaskLockbox(ts);
mdc = newMockMDC(); mdc = newMockMDC();
@ -289,6 +300,34 @@ public class TaskLifecycleTest
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
} }
@Test
public void testNoopTask() throws Exception
{
final Task noopTask = new DefaultObjectMapper().readValue(
"{\"type\":\"noop\", \"runTime\":\"100\"}\"",
Task.class
);
final TaskStatus status = runTask(noopTask);
Assert.assertEquals("statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@Test
public void testNeverReadyTask() throws Exception
{
final Task neverReadyTask = new DefaultObjectMapper().readValue(
"{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"",
Task.class
);
final TaskStatus status = runTask(neverReadyTask);
Assert.assertEquals("statusCode", TaskStatus.Status.FAILED, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@Test @Test
public void testSimple() throws Exception public void testSimple() throws Exception
{ {
@ -400,28 +439,41 @@ public class TaskLifecycleTest
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
} }
private TaskStatus runTask(Task task) private TaskStatus runTask(final Task task) throws Exception
{ {
final Task dummyTask = new DefaultObjectMapper().readValue(
"{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"",
Task.class
);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
Preconditions.checkArgument(!task.getId().equals(dummyTask.getId()));
tq.add(dummyTask);
tq.add(task); tq.add(task);
TaskStatus status; TaskStatus retVal = null;
try { for (final String taskId : ImmutableList.of(dummyTask.getId(), task.getId())) {
while ((status = tsqa.getStatus(task.getId()).get()).isRunnable()) { try {
if (System.currentTimeMillis() > startTime + 10 * 1000) { TaskStatus status;
throw new ISE("Where did the task go?!: %s", task.getId()); while ((status = tsqa.getStatus(taskId).get()).isRunnable()) {
if (System.currentTimeMillis() > startTime + 10 * 1000) {
throw new ISE("Where did the task go?!: %s", task.getId());
}
Thread.sleep(100);
} }
if (taskId.equals(task.getId())) {
Thread.sleep(100); retVal = status;
}
}
catch (Exception e) {
throw Throwables.propagate(e);
} }
} }
catch (Exception e) {
throw Throwables.propagate(e);
}
return status; return retVal;
} }
private static class MockIndexerDBCoordinator extends IndexerDBCoordinator private static class MockIndexerDBCoordinator extends IndexerDBCoordinator

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>
@ -41,7 +41,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.1</metamx.java-util.version> <metamx.java-util.version>0.25.1</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version> <apache.curator.version>2.1.0-incubating</apache.curator.version>
<druid.api.version>0.1.5</druid.api.version> <druid.api.version>0.1.6</druid.api.version>
</properties> </properties>
<modules> <modules>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -84,6 +84,11 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
{ {
final int priority = Integer.parseInt(query.getContextValue("priority", "0")); final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
return Sequences.empty();
}
return new BaseSequence<T, Iterator<T>>( return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>() new BaseSequence.IteratorMaker<T, Iterator<T>>()
{ {

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -21,7 +21,7 @@ package io.druid.storage.s3;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.metamx.common.RetryUtils; import com.metamx.common.RetryUtils;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object; import org.jets3t.service.model.S3Object;
@ -61,9 +61,9 @@ public class S3Utils
{ {
if (e instanceof IOException) { if (e instanceof IOException) {
return true; return true;
} else if (e instanceof S3ServiceException) { } else if (e instanceof ServiceException) {
final boolean isIOException = e.getCause() instanceof IOException; final boolean isIOException = e.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(((S3ServiceException) e).getS3ErrorCode()); final boolean isTimeout = "RequestTimeout".equals(((ServiceException) e).getErrorCode());
return isIOException || isTimeout; return isIOException || isTimeout;
} else { } else {
return false; return false;
@ -75,18 +75,18 @@ public class S3Utils
} }
public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey) public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey)
throws S3ServiceException throws ServiceException
{ {
try { try {
s3Client.getObjectDetails(new S3Bucket(bucketName), objectKey); s3Client.getObjectDetails(new S3Bucket(bucketName), objectKey);
} }
catch (S3ServiceException e) { catch (ServiceException e) {
if (404 == e.getResponseCode() if (404 == e.getResponseCode()
|| "NoSuchKey".equals(e.getS3ErrorCode()) || "NoSuchKey".equals(e.getErrorCode())
|| "NoSuchBucket".equals(e.getS3ErrorCode())) { || "NoSuchBucket".equals(e.getErrorCode())) {
return false; return false;
} }
if ("AccessDenied".equals(e.getS3ErrorCode())) { if ("AccessDenied".equals(e.getErrorCode())) {
// Object is inaccessible to current user, but does exist. // Object is inaccessible to current user, but does exist.
return true; return true;
} }

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.FileIteratingFirehose; import io.druid.data.input.impl.FileIteratingFirehose;
@ -78,21 +79,26 @@ public class LocalFirehoseFactory implements FirehoseFactory
@Override @Override
public Firehose connect() throws IOException public Firehose connect() throws IOException
{ {
final LinkedList<File> files = Lists.<File>newLinkedList( File[] foundFiles = baseDir.listFiles(
Arrays.<File>asList( new FilenameFilter()
baseDir.listFiles( {
new FilenameFilter() @Override
{ public boolean accept(File file, String name)
@Override {
public boolean accept(File file, String name) return name.contains(filter);
{ }
return name.contains(filter); }
}
}
)
)
); );
if (foundFiles == null || foundFiles.length == 0) {
throw new ISE("Found no files to ingest! Check your schema.");
}
final LinkedList<File> files = Lists.<File>newLinkedList(
Arrays.asList(foundFiles)
);
return new FileIteratingFirehose( return new FileIteratingFirehose(
new Iterator<LineIterator>() new Iterator<LineIterator>()
{ {

View File

@ -20,16 +20,11 @@
package io.druid.server; package io.druid.server;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.inject.Injector;
import io.druid.initialization.DruidModule;
import io.druid.initialization.Initialization; import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import java.util.ArrayList;
import java.util.List;
/** /**
*/ */
@ -47,52 +42,20 @@ public class StatusResource
{ {
return new Status( return new Status(
Initialization.class.getPackage().getImplementationVersion(), Initialization.class.getPackage().getImplementationVersion(),
getExtensionVersions(),
new Memory(Runtime.getRuntime()) new Memory(Runtime.getRuntime())
); );
} }
/**
* Load the unique extensions and return their implementation-versions
*
* @return map of extensions loaded with their respective implementation versions.
*/
private static List<ModuleVersion> getExtensionVersions()
{
final Injector injector = Initialization.makeStartupInjector();
final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class);
final List<DruidModule> druidModules = Initialization.getFromExtensions(config, DruidModule.class);
List<ModuleVersion> moduleVersions = new ArrayList<>();
for (DruidModule module : druidModules) {
String artifact = module.getClass().getPackage().getImplementationTitle();
String version = module.getClass().getPackage().getImplementationVersion();
ModuleVersion moduleVersion;
if (artifact != null) {
moduleVersion = new ModuleVersion(module.getClass().getCanonicalName(), artifact, version);
} else {
moduleVersion = new ModuleVersion(module.getClass().getCanonicalName());
}
moduleVersions.add(moduleVersion);
}
return moduleVersions;
}
public static class Status public static class Status
{ {
final String version; final String version;
final List<ModuleVersion> modules;
final Memory memory; final Memory memory;
public Status( public Status(
String version, List<ModuleVersion> modules, Memory memory String version, Memory memory
) )
{ {
this.version = version; this.version = version;
this.modules = modules;
this.memory = memory; this.memory = memory;
} }
@ -102,12 +65,6 @@ public class StatusResource
return version; return version;
} }
@JsonProperty
public List<ModuleVersion> getModules()
{
return modules;
}
@JsonProperty @JsonProperty
public Memory getMemory() public Memory getMemory()
{ {
@ -117,20 +74,8 @@ public class StatusResource
@Override @Override
public String toString() public String toString()
{ {
final String NL = "\n"; final String NL = System.getProperty("line.separator");
StringBuilder output = new StringBuilder(); return String.format("Druid version - %s", version) + NL;
output.append(String.format("Druid version - %s", version)).append(NL).append(NL);
if (modules.size() > 0) {
output.append("Registered Druid Modules").append(NL);
} else {
output.append("No Druid Modules loaded !");
}
for (ModuleVersion moduleVersion : modules) {
output.append(moduleVersion).append(NL);
}
return output.toString();
} }
} }

View File

@ -116,7 +116,13 @@ public class ServerManager implements QuerySegmentWalker
return segmentLoader.isSegmentLoaded(segment); return segmentLoader.isSegmentLoaded(segment);
} }
public void loadSegment(final DataSegment segment) throws SegmentLoadingException /**
* Load a single segment.
* @param segment segment to load
* @return true if the segment was newly loaded, false if it was already loaded
* @throws SegmentLoadingException if the segment cannot be loaded
*/
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
{ {
final Segment adapter; final Segment adapter;
try { try {
@ -150,8 +156,8 @@ public class ServerManager implements QuerySegmentWalker
segment.getVersion() segment.getVersion()
); );
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.info("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
throw new SegmentLoadingException("Segment already exists[%s]", segment.getIdentifier()); return false;
} }
loadedIntervals.add( loadedIntervals.add(
@ -165,6 +171,7 @@ public class ServerManager implements QuerySegmentWalker
synchronized (dataSourceCounts) { synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, 1L); dataSourceCounts.add(dataSource, 1L);
} }
return true;
} }
} }

View File

@ -230,34 +230,37 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try { try {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded;
try { try {
serverManager.loadSegment(segment); loaded = serverManager.loadSegment(segment);
} }
catch (Exception e) { catch (Exception e) {
removeSegment(segment); removeSegment(segment);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
} }
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (loaded) {
if (!segmentInfoCacheFile.exists()) { File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
try { try {
jsonMapper.writeValue(segmentInfoCacheFile, segment); announcer.announceSegment(segment);
} }
catch (IOException e) { catch (IOException e) {
removeSegment(segment); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
} }
} }
try {
announcer.announceSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource") log.makeAlert(e, "Failed to load segment for dataSource")
@ -275,8 +278,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded;
try { try {
serverManager.loadSegment(segment); loaded = serverManager.loadSegment(segment);
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
@ -285,20 +289,22 @@ public class ZkCoordinator implements DataSegmentChangeHandler
continue; continue;
} }
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (loaded) {
if (!segmentInfoCacheFile.exists()) { File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
try { if (!segmentInfoCacheFile.exists()) {
jsonMapper.writeValue(segmentInfoCacheFile, segment); try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment);
segmentFailures.add(segment.getIdentifier());
continue;
}
} }
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment);
segmentFailures.add(segment.getIdentifier());
continue;
}
}
validSegments.add(segment); validSegments.add(segment);
}
} }
try { try {

View File

@ -46,16 +46,18 @@ import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient; import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.config.JacksonConfigManager; import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseSegmentManager; import io.druid.db.DatabaseSegmentManager;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.server.coordinator.helper.DruidCoordinatorCleanup; import io.druid.server.coordinator.helper.DruidCoordinatorCleanup;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.server.coordinator.helper.DruidCoordinatorLogger; import io.druid.server.coordinator.helper.DruidCoordinatorLogger;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -106,6 +108,8 @@ public class DruidCoordinator
private final LoadQueueTaskMaster taskMaster; private final LoadQueueTaskMaster taskMaster;
private final Map<String, LoadQueuePeon> loadManagementPeons; private final Map<String, LoadQueuePeon> loadManagementPeons;
private final AtomicReference<LeaderLatch> leaderLatch; private final AtomicReference<LeaderLatch> leaderLatch;
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
@Inject @Inject
public DruidCoordinator( public DruidCoordinator(
@ -119,7 +123,9 @@ public class DruidCoordinator
ServiceEmitter emitter, ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory, ScheduledExecutorFactory scheduledExecutorFactory,
IndexingServiceClient indexingServiceClient, IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self
) )
{ {
this( this(
@ -134,6 +140,8 @@ public class DruidCoordinator
scheduledExecutorFactory, scheduledExecutorFactory,
indexingServiceClient, indexingServiceClient,
taskMaster, taskMaster,
serviceAnnouncer,
self,
Maps.<String, LoadQueuePeon>newConcurrentMap() Maps.<String, LoadQueuePeon>newConcurrentMap()
); );
} }
@ -150,6 +158,8 @@ public class DruidCoordinator
ScheduledExecutorFactory scheduledExecutorFactory, ScheduledExecutorFactory scheduledExecutorFactory,
IndexingServiceClient indexingServiceClient, IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster, LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap
) )
{ {
@ -164,6 +174,8 @@ public class DruidCoordinator
this.emitter = emitter; this.emitter = emitter;
this.indexingServiceClient = indexingServiceClient; this.indexingServiceClient = indexingServiceClient;
this.taskMaster = taskMaster; this.taskMaster = taskMaster;
this.serviceAnnouncer = serviceAnnouncer;
this.self = self;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
@ -481,6 +493,7 @@ public class DruidCoordinator
databaseSegmentManager.start(); databaseSegmentManager.start();
databaseRuleManager.start(); databaseRuleManager.start();
serverInventoryView.start(); serverInventoryView.start();
serviceAnnouncer.announce(self);
final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList(); final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList();
dynamicConfigs = configManager.watch( dynamicConfigs = configManager.watch(
@ -561,8 +574,10 @@ public class DruidCoordinator
} }
loadManagementPeons.clear(); loadManagementPeons.clear();
databaseSegmentManager.stop(); serviceAnnouncer.unannounce(self);
serverInventoryView.stop(); serverInventoryView.stop();
databaseRuleManager.stop();
databaseSegmentManager.stop();
leader = false; leader = false;
} }
catch (Exception e) { catch (Exception e) {

View File

@ -32,10 +32,10 @@ import javax.ws.rs.Path;
/** /**
*/ */
@Path("/static/info") @Path("/static/info")
public class BackwardsCompatiableInfoResource extends InfoResource public class BackwardsCompatibleInfoResource extends InfoResource
{ {
@Inject @Inject
public BackwardsCompatiableInfoResource( public BackwardsCompatibleInfoResource(
DruidCoordinator coordinator, DruidCoordinator coordinator,
InventoryView serverInventoryView, InventoryView serverInventoryView,
DatabaseSegmentManager databaseSegmentManager, DatabaseSegmentManager databaseSegmentManager,

View File

@ -32,9 +32,6 @@
<body> <body>
<div class="container"> <div class="container">
<div>
<h2>Druid Version: ${pom.version} Druid API Version: ${druid.api.version}</h2>
</div>
<div> <div>
<a href="view.html">View Information about the Cluster</a> <a href="view.html">View Information about the Cluster</a>
</div> </div>

View File

@ -23,8 +23,10 @@ import com.google.common.collect.MapMaker;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.client.SingleServerInventoryView; import io.druid.client.SingleServerInventoryView;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.db.DatabaseSegmentManager; import io.druid.db.DatabaseSegmentManager;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -111,6 +113,8 @@ public class DruidCoordinatorTest
scheduledExecutorFactory, scheduledExecutorFactory,
null, null,
taskMaster, taskMaster,
new NoopServiceAnnouncer(),
new DruidNode("hey", "what", 1234),
loadManagementPeons loadManagementPeons
); );
} }

View File

@ -27,7 +27,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.32-SNAPSHOT</version> <version>0.6.37-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -53,7 +53,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "broker", name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.31/Broker.html for a description" description = "Runs a broker node, see http://druid.io/docs/0.6.36/Broker.html for a description"
) )
public class CliBroker extends ServerRunnable public class CliBroker extends ServerRunnable
{ {

View File

@ -28,7 +28,6 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.client.indexing.IndexingServiceClient; import io.druid.client.indexing.IndexingServiceClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseRuleManagerConfig; import io.druid.db.DatabaseRuleManagerConfig;
import io.druid.db.DatabaseRuleManagerProvider; import io.druid.db.DatabaseRuleManagerProvider;
@ -41,18 +40,16 @@ import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule; import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.coordinator.LoadQueueTaskMaster; import io.druid.server.coordinator.LoadQueueTaskMaster;
import io.druid.server.http.BackwardsCompatiableInfoResource; import io.druid.server.http.BackwardsCompatibleInfoResource;
import io.druid.server.http.CoordinatorDynamicConfigsResource; import io.druid.server.http.CoordinatorDynamicConfigsResource;
import io.druid.server.http.CoordinatorRedirectInfo; import io.druid.server.http.CoordinatorRedirectInfo;
import io.druid.server.http.CoordinatorResource; import io.druid.server.http.CoordinatorResource;
import io.druid.server.http.InfoResource; import io.druid.server.http.InfoResource;
import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo; import io.druid.server.http.RedirectInfo;
import io.druid.server.http.RedirectServlet;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -63,7 +60,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "coordinator", name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.31/Coordinator.html for a description." description = "Runs the Coordinator, see http://druid.io/docs/0.6.36/Coordinator.html for a description."
) )
public class CliCoordinator extends ServerRunnable public class CliCoordinator extends ServerRunnable
{ {
@ -88,8 +85,8 @@ public class CliCoordinator extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.manager.segments", DatabaseSegmentManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.segments", DatabaseSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.rules", DatabaseRuleManagerConfig.class);
binder.bind(RedirectServlet.class).in(LazySingleton.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class); binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);
binder.bind(DatabaseSegmentManager.class) binder.bind(DatabaseSegmentManager.class)
.toProvider(DatabaseSegmentManagerProvider.class) .toProvider(DatabaseSegmentManagerProvider.class)
@ -101,15 +98,12 @@ public class CliCoordinator extends ServerRunnable
binder.bind(IndexingServiceClient.class).in(LazySingleton.class); binder.bind(IndexingServiceClient.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);
binder.bind(DruidCoordinator.class); binder.bind(DruidCoordinator.class);
LifecycleModule.register(binder, DruidCoordinator.class); LifecycleModule.register(binder, DruidCoordinator.class);
DiscoveryModule.register(binder, Self.class);
binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer()); binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer());
Jerseys.addResource(binder, BackwardsCompatiableInfoResource.class); Jerseys.addResource(binder, BackwardsCompatibleInfoResource.class);
Jerseys.addResource(binder, InfoResource.class); Jerseys.addResource(binder, InfoResource.class);
Jerseys.addResource(binder, CoordinatorResource.class); Jerseys.addResource(binder, CoordinatorResource.class);
Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class); Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class);

View File

@ -41,7 +41,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "hadoop", name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.31/Batch-ingestion.html for a description." description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.36/Batch-ingestion.html for a description."
) )
public class CliHadoopIndexer implements Runnable public class CliHadoopIndexer implements Runnable
{ {

View File

@ -42,7 +42,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "historical", name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.31/Historical.html for a description" description = "Runs a Historical node, see http://druid.io/docs/0.6.36/Historical.html for a description"
) )
public class CliHistorical extends ServerRunnable public class CliHistorical extends ServerRunnable
{ {

View File

@ -44,7 +44,7 @@ import io.druid.guice.PolyBind;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
@ -95,7 +95,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "overlord", name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.31/Indexing-Service.html for a description" description = "Runs an Overlord node, see http://druid.io/docs/0.6.36/Indexing-Service.html for a description"
) )
public class CliOverlord extends ServerRunnable public class CliOverlord extends ServerRunnable
{ {
@ -120,7 +120,11 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskMaster.class).in(ManageLifecycle.class); binder.bind(TaskMaster.class).in(ManageLifecycle.class);
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<List<TaskLogStreamer>>(){}) binder.bind(
new TypeLiteral<List<TaskLogStreamer>>()
{
}
)
.toProvider( .toProvider(
new ListProvider<TaskLogStreamer>() new ListProvider<TaskLogStreamer>()
.add(TaskRunnerTaskLogStreamer.class) .add(TaskRunnerTaskLogStreamer.class)
@ -154,10 +158,15 @@ public class CliOverlord extends ServerRunnable
private void configureTaskStorage(Binder binder) private void configureTaskStorage(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
PolyBind.createChoice( PolyBind.createChoice(
binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class) binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class)
); );
final MapBinder<String, TaskStorage> storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class)); final MapBinder<String, TaskStorage> storageBinder = PolyBind.optionBinder(
binder,
Key.get(TaskStorage.class)
);
storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class); storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class);
binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class); binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class);
@ -176,7 +185,10 @@ public class CliOverlord extends ServerRunnable
Key.get(TaskRunnerFactory.class), Key.get(TaskRunnerFactory.class),
Key.get(ForkingTaskRunnerFactory.class) Key.get(ForkingTaskRunnerFactory.class)
); );
final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(
binder,
Key.get(TaskRunnerFactory.class)
);
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
@ -189,7 +201,9 @@ public class CliOverlord extends ServerRunnable
private void configureAutoscale(Binder binder) private void configureAutoscale(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class);
binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class); binder.bind(ResourceManagementStrategy.class)
.to(SimpleResourceManagementStrategy.class)
.in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null); JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
@ -218,13 +232,17 @@ public class CliOverlord extends ServerRunnable
} }
/** /**
*/ */
private static class OverlordJettyServerInitializer implements JettyServerInitializer private static class OverlordJettyServerInitializer implements JettyServerInitializer
{ {
@Override @Override
public void initialize(Server server, Injector injector) public void initialize(Server server, Injector injector)
{ {
ResourceHandler resourceHandler = new ResourceHandler(); final ServletContextHandler redirect = new ServletContextHandler(ServletContextHandler.SESSIONS);
redirect.setContextPath("/");
redirect.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
final ResourceHandler resourceHandler = new ResourceHandler();
resourceHandler.setBaseResource( resourceHandler.setBaseResource(
new ResourceCollection( new ResourceCollection(
new String[]{ new String[]{
@ -238,12 +256,11 @@ public class CliOverlord extends ServerRunnable
root.setContextPath("/"); root.setContextPath("/");
HandlerList handlerList = new HandlerList(); HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); handlerList.setHandlers(new Handler[]{redirect, resourceHandler, root, new DefaultHandler()});
server.setHandler(handlerList); server.setHandler(handlerList);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null); root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GuiceFilter.class, "/*", null); root.addFilter(GuiceFilter.class, "/*", null);
} }
} }

View File

@ -30,7 +30,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "realtime", name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.31/Realtime.html for a description" description = "Runs a realtime node, see http://druid.io/docs/0.6.36/Realtime.html for a description"
) )
public class CliRealtime extends ServerRunnable public class CliRealtime extends ServerRunnable
{ {

View File

@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
*/ */
@Command( @Command(
name = "realtime", name = "realtime",
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.31/Realtime.html for a description" description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.36/Realtime.html for a description"
) )
public class CliRealtimeExample extends ServerRunnable public class CliRealtimeExample extends ServerRunnable
{ {

View File

@ -36,25 +36,28 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter; import org.eclipse.jetty.servlets.GzipFilter;
/** /**
*/ */
class CoordinatorJettyServerInitializer implements JettyServerInitializer class CoordinatorJettyServerInitializer implements JettyServerInitializer
{ {
@Override @Override
public void initialize(Server server, Injector injector) public void initialize(Server server, Injector injector)
{ {
ResourceHandler resourceHandler = new ResourceHandler(); final ServletContextHandler redirect = new ServletContextHandler(ServletContextHandler.SESSIONS);
redirect.setContextPath("/");
redirect.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
final ResourceHandler resourceHandler = new ResourceHandler();
resourceHandler.setResourceBase(DruidCoordinator.class.getClassLoader().getResource("static").toExternalForm()); resourceHandler.setResourceBase(DruidCoordinator.class.getClassLoader().getResource("static").toExternalForm());
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/"); root.setContextPath("/");
HandlerList handlerList = new HandlerList(); HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); handlerList.setHandlers(new Handler[]{redirect, resourceHandler, root, new DefaultHandler()});
server.setHandler(handlerList); server.setHandler(handlerList);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null); root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GuiceFilter.class, "/*", null); root.addFilter(GuiceFilter.class, "/*", null);
} }
} }

View File

@ -178,7 +178,7 @@ public class ConvertProperties implements Runnable
} }
updatedProps.setProperty( updatedProps.setProperty(
"druid.monitoring.monitors", "[\"io.druid.server.metrics.ServerMonitor\", \"com.metamx.metrics.SysMonitor\"]" "druid.monitoring.monitors", "[\"com.metamx.metrics.SysMonitor\"]"
); );
BufferedWriter out = null; BufferedWriter out = null;