Merge branch 'master' into az

This commit is contained in:
fjy 2013-12-18 15:44:43 -08:00
commit 9b8134c73e
61 changed files with 1119 additions and 96 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
echo "See also http://druid.io/docs/0.6.36"
echo "See also http://druid.io/docs/0.6.40"

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -3,7 +3,7 @@ layout: doc_page
---
# Booting a Single Node Cluster #
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.36-bin.tar.gz).
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.40-bin.tar.gz).
The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables:

View File

@ -285,8 +285,10 @@ This deep storage is used to interface with Amazon's S3.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.storage.bucket`|S3 bucket name.|none|
|`druid.storage.basekey`|S3 base key.|none|
|`druid.storage.baseKey`|S3 object key prefix for storage.|none|
|`druid.storage.disableAcl`|Boolean flag for ACL.|false|
|`druid.storage.archiveBucket`|S3 bucket name for archiving when running the indexing-service *archive task*.|none|
|`druid.storage.archiveBaseKey`|S3 object key prefix for archiving.|none|
#### HDFS Deep Storage

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.36
git checkout druid-0.6.40
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.36"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.40"]
druid.zk.service.host=localhost

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.36
cd druid-services-0.6.40
```
You should see a bunch of files:

View File

@ -44,7 +44,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
#### Setting up Kafka
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.36/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.40/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.36"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.40"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -238,7 +238,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.36","io.druid.extensions:druid-kafka-seven:0.6.36"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.40","io.druid.extensions:druid-kafka-seven:0.6.40"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.36
cd druid-services-0.6.40
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.36-bin.tar.gz.
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.36"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.40"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.36","io.druid.extensions:druid-kafka-seven:0.6.36","io.druid.extensions:druid-rabbitmq:0.6.36"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.40","io.druid.extensions:druid-kafka-seven:0.6.40","io.druid.extensions:druid-rabbitmq:0.6.40"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -29,7 +29,9 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
@ -52,6 +54,8 @@ public class TaskToolbox
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
@ -68,6 +72,8 @@ public class TaskToolbox
ServiceEmitter emitter,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@ -84,6 +90,8 @@ public class TaskToolbox
this.emitter = emitter;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
@ -119,6 +127,16 @@ public class TaskToolbox
return dataSegmentKiller;
}
public DataSegmentMover getDataSegmentMover()
{
return dataSegmentMover;
}
public DataSegmentArchiver getDataSegmentArchiver()
{
return dataSegmentArchiver;
}
public DataSegmentAnnouncer getSegmentAnnouncer()
{
return segmentAnnouncer;

View File

@ -24,13 +24,14 @@ import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.server.coordination.DataSegmentAnnouncer;
@ -47,6 +48,8 @@ public class TaskToolboxFactory
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
@ -62,6 +65,8 @@ public class TaskToolboxFactory
ServiceEmitter emitter,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@ -76,6 +81,8 @@ public class TaskToolboxFactory
this.emitter = emitter;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
@ -96,6 +103,8 @@ public class TaskToolboxFactory
emitter,
segmentPusher,
dataSegmentKiller,
dataSegmentMover,
dataSegmentArchiver,
segmentAnnouncer,
newSegmentServerView,
queryRunnerFactoryConglomerate,

View File

@ -0,0 +1,77 @@
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Set;
public class SegmentMetadataUpdateAction implements TaskAction<Void>
{
@JsonIgnore
private final Set<DataSegment> segments;
@JsonCreator
public SegmentMetadataUpdateAction(
@JsonProperty("segments") Set<DataSegment> segments
)
{
this.segments = ImmutableSet.copyOf(segments);
}
@JsonProperty
public Set<DataSegment> getSegments()
{
return segments;
}
public TypeReference<Void> getReturnTypeReference()
{
return new TypeReference<Void>() {};
}
@Override
public Void perform(
Task task, TaskActionToolbox toolbox
) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setUser2(task.getDataSource())
.setUser4(task.getType());
for (DataSegment segment : segments) {
metricBuilder.setUser5(segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentMoved/bytes", segment.getSize()));
}
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{
return "SegmentMetadataUpdateAction{" +
"segments=" + segments +
'}';
}
}

View File

@ -35,7 +35,8 @@ import java.io.IOException;
@JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class),
@JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class)
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class)
})
public interface TaskAction<RetType>
{

View File

@ -0,0 +1,110 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.List;
public class ArchiveTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(ArchiveTask.class);
public ArchiveTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
TaskUtils.makeId(id, "archive", dataSource, interval),
dataSource,
interval
);
}
@Override
public String getType()
{
return "archive";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
}
if (!myLock.getInterval().equals(getInterval())) {
throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
}
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
throw new ISE(
"WTF?! Unused segment[%s] has version[%s] > task version[%s]",
unusedSegment.getIdentifier(),
unusedSegment.getVersion(),
myLock.getVersion()
);
}
log.info("OK to archive segment: %s", unusedSegment.getIdentifier());
}
List<DataSegment> archivedSegments = Lists.newLinkedList();
// Move segments
for (DataSegment segment : unusedSegments) {
archivedSegments.add(toolbox.getDataSegmentArchiver().archive(segment));
}
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(
new SegmentMetadataUpdateAction(
ImmutableSet.copyOf(archivedSegments)
)
);
return TaskStatus.success(getId());
}
}

View File

@ -0,0 +1,121 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
public class MoveTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(MoveTask.class);
private final Map<String, Object> targetLoadSpec;
@JsonCreator
public MoveTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("target") Map<String, Object> targetLoadSpec
)
{
super(
TaskUtils.makeId(id, "move", dataSource, interval),
dataSource,
interval
);
this.targetLoadSpec = targetLoadSpec;
}
@Override
public String getType()
{
return "move";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
if(!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
}
if(!myLock.getInterval().equals(getInterval())) {
throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
}
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
for(final DataSegment unusedSegment : unusedSegments) {
if(unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
throw new ISE(
"WTF?! Unused segment[%s] has version[%s] > task version[%s]",
unusedSegment.getIdentifier(),
unusedSegment.getVersion(),
myLock.getVersion()
);
}
log.info("OK to move segment: %s", unusedSegment.getIdentifier());
}
List<DataSegment> movedSegments = Lists.newLinkedList();
// Move segments
for (DataSegment segment : unusedSegments) {
movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec));
}
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(
ImmutableSet.copyOf(movedSegments)
));
return TaskStatus.success(getId());
}
@JsonProperty
public Map<String, Object> getTargetLoadSpec()
{
return targetLoadSpec;
}
}

View File

@ -45,6 +45,8 @@ import io.druid.query.QueryRunner;
@JsonSubTypes.Type(name = "merge", value = MergeTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),

View File

@ -28,8 +28,6 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbConnectorConfig;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
@ -213,6 +211,24 @@ public class IndexerDBCoordinator
return true;
}
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
for(final DataSegment segment : segments) {
updatePayload(handle, segment);
}
return null;
}
}
);
}
public void deleteSegments(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(
@ -235,10 +251,27 @@ public class IndexerDBCoordinator
{
handle.createStatement(
String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable())
).bind("id", segment.getIdentifier())
)
.bind("id", segment.getIdentifier())
.execute();
}
private void updatePayload(final Handle handle, final DataSegment segment) throws IOException
{
try {
handle.createStatement(
String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable())
)
.bind("id", segment.getIdentifier())
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
}
catch (IOException e) {
log.error(e, "Exception inserting into DB");
throw e;
}
}
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
List<DataSegment> matchingSegments = dbi.withHandle(

View File

@ -51,7 +51,7 @@
<div class="waiting_loading">Loading Waiting Tasks... this may take a few minutes</div>
<table id="waitingTable"></table>
<h2>Complete Tasks</h2>
<h2>Complete Tasks - Tasks recently completed</h2>
<div class="complete_loading">Loading Complete Tasks... this may take a few minutes</div>
<table id="completeTable"></table>

View File

@ -294,7 +294,58 @@ public class TaskSerdeTest
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertEquals(task.getSegments(), ((AppendTask) task2).getSegments());
Assert.assertEquals(task.getSegments(), task2.getSegments());
}
@Test
public void testArchiveTaskSerde() throws Exception
{
final ArchiveTask task = new ArchiveTask(
null,
"foo",
new Interval("2010-01-01/P1D")
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final ArchiveTask task2 = (ArchiveTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testMoveTaskSerde() throws Exception
{
final MoveTask task = new MoveTask(
null,
"foo",
new Interval("2010-01-01/P1D"),
ImmutableMap.<String, Object>of("bucket", "hey", "baseKey", "what")
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final MoveTask task2 = (MoveTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(ImmutableMap.<String, Object>of("bucket", "hey", "baseKey", "what"), task.getTargetLoadSpec());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertEquals(task.getTargetLoadSpec(), task2.getTargetLoadSpec());
}
@Test

View File

@ -64,7 +64,9 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPuller;
@ -76,9 +78,7 @@ import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -88,6 +88,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TaskLifecycleTest
@ -158,6 +159,22 @@ public class TaskLifecycleTest
}
},
new DataSegmentMover()
{
@Override
public DataSegment move(DataSegment dataSegment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return dataSegment;
}
},
new DataSegmentArchiver()
{
@Override
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
return segment;
}
},
null, // segment announcer
null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective

View File

@ -122,7 +122,7 @@ public class WorkerTaskMonitorTest
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
@ -209,4 +209,4 @@ public class WorkerTaskMonitorTest
Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode());
}
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -107,7 +107,15 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory
return null;
}
return parser.parse(ByteBuffer.wrap(message));
try {
return parser.parse(ByteBuffer.wrap(message));
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", ByteBuffer.wrap(message), e.toString()))
.build();
}
}
@Override

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -120,7 +120,15 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory
public InputRow parseMessage(Message message) throws FormattedException
{
return parser.parse(message.payload());
try {
return parser.parse(message.payload());
}
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", message.payload(), e.toString()))
.build();
}
}
@Override

View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
@ -41,7 +41,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.1</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version>
<druid.api.version>0.1.6</druid.api.version>
<druid.api.version>0.1.7</druid.api.version>
</properties>
<modules>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -265,9 +265,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
final Object[] args = new Object[size + 1];
args[0] = current;
int i = 0;
while (i < size) {
args[i + 1] = selectorList[i++].get();
for (int i = 0 ; i < size ; i++) {
final ObjectColumnSelector selector = selectorList[i];
if (selector != null) {
args[i + 1] = selector.get();
}
}
final Object res = fnAggregate.call(cx, scope, scope, args);

View File

@ -27,6 +27,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
public class JavaScriptAggregatorTest
@ -141,6 +142,39 @@ public class JavaScriptAggregatorTest
Assert.assertEquals(val, agg.get(buf, position));
}
@Test
public void testAggregateMissingColumn()
{
Map<String, String> script = scriptDoubleSum;
JavaScriptAggregator agg = new JavaScriptAggregator(
"billy",
Collections.<ObjectColumnSelector>singletonList(null),
JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"),
script.get("fnReset"),
script.get("fnCombine"))
);
final double val = 0;
Assert.assertEquals("billy", agg.getName());
agg.reset();
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
agg.aggregate();
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
agg.aggregate();
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
Assert.assertEquals(val, agg.get());
}
public static void main(String... args) throws Exception {
final LoopingFloatColumnSelector selector = new LoopingFloatColumnSelector(new float[]{42.12f, 9f});

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -0,0 +1,58 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.s3;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSegmentArchiver
{
private final S3DataSegmentArchiverConfig config;
@Inject
public S3DataSegmentArchiver(
RestS3Service s3Client,
S3DataSegmentArchiverConfig config
)
{
super(s3Client);
this.config = config;
}
@Override
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
String targetS3Bucket = config.getArchiveBucket();
String targetS3BaseKey = config.getArchiveBaseKey();
return move(
segment,
ImmutableMap.<String, Object>of(
"bucket", targetS3Bucket,
"baseKey", targetS3BaseKey
)
);
}
}

View File

@ -0,0 +1,41 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.s3;
import com.fasterxml.jackson.annotation.JsonProperty;
public class S3DataSegmentArchiverConfig
{
@JsonProperty
public String archiveBucket = "";
@JsonProperty
public String archiveBaseKey = "";
public String getArchiveBucket()
{
return archiveBucket;
}
public String getArchiveBaseKey()
{
return archiveBaseKey;
}
}

View File

@ -53,7 +53,7 @@ public class S3DataSegmentKiller implements DataSegmentKiller
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);

View File

@ -0,0 +1,130 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.s3;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.util.Map;
public class S3DataSegmentMover implements DataSegmentMover
{
private static final Logger log = new Logger(S3DataSegmentMover.class);
private final RestS3Service s3Client;
@Inject
public S3DataSegmentMover(
RestS3Service s3Client
)
{
this.s3Client = s3Client;
}
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
try {
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket");
final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey");
final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, segment);
String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path);
if (targetS3Bucket.isEmpty()) {
throw new SegmentLoadingException("Target S3 bucket is not specified");
}
if (targetS3Path.isEmpty()) {
throw new SegmentLoadingException("Target S3 baseKey is not specified");
}
safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path);
safeMove(s3Bucket, s3DescriptorPath, targetS3Bucket, targetS3DescriptorPath);
return segment.withLoadSpec(
ImmutableMap.<String, Object>builder()
.putAll(
Maps.filterKeys(
loadSpec, new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return !(input.equals("bucket") || input.equals("key"));
}
}
)
)
.put("bucket", targetS3Bucket)
.put("key", targetS3Path)
.build()
);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, "Unable to move segment[%s]", segment.getIdentifier());
}
}
private void safeMove(String s3Bucket, String s3Path, String targetS3Bucket, String targetS3Path)
throws ServiceException, SegmentLoadingException
{
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
log.info(
"Moving file[s3://%s/%s] to [s3://%s/%s]",
s3Bucket,
s3Path,
targetS3Bucket,
targetS3Path
);
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, new S3Object(targetS3Path), false);
} else {
// ensure object exists in target location
if(s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) {
log.info(
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
s3Bucket, s3Path,
targetS3Bucket, targetS3Path
);
}
else {
throw new SegmentLoadingException(
"Unable to move file [s3://%s/%s] to [s3://%s/%s], not present in either source or target location",
s3Bucket, s3Path,
targetS3Bucket, targetS3Path
);
}
}
}
}

View File

@ -20,7 +20,6 @@
package io.druid.storage.s3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
@ -29,7 +28,6 @@ import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.jets3t.service.ServiceException;
@ -45,7 +43,6 @@ import java.util.concurrent.Callable;
public class S3DataSegmentPusher implements DataSegmentPusher
{
private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class);
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final RestS3Service s3Client;
private final S3DataSegmentPusherConfig config;
@ -73,10 +70,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
log.info("Uploading [%s] to S3", indexFilesDir);
final String outputKey = JOINER.join(
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
DataSegmentPusherUtil.getStorageDir(inSegment)
);
final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), inSegment);
final File zipOutFile = File.createTempFile("druid", "index.zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
@ -90,8 +84,10 @@ public class S3DataSegmentPusher implements DataSegmentPusher
S3Object toPush = new S3Object(zipOutFile);
final String outputBucket = config.getBucket();
final String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
toPush.setBucketName(outputBucket);
toPush.setKey(outputKey + "/index.zip");
toPush.setKey(s3Path);
if (!config.getDisableAcl()) {
toPush.setAcl(AccessControlList.REST_CANNED_AUTHENTICATED_READ);
}
@ -116,7 +112,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile);
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
descriptorObject.setKey(s3DescriptorPath);
if (!config.getDisableAcl()) {
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
@ -142,4 +138,4 @@ public class S3DataSegmentPusher implements DataSegmentPusher
throw Throwables.propagate(e);
}
}
}
}

View File

@ -51,8 +51,11 @@ public class S3StorageDruidModule implements DruidModule
Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder).addBinding("s3_zip").to(S3DataSegmentArchiver.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class);
Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class);

View File

@ -19,9 +19,12 @@
package io.druid.storage.s3;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.metamx.common.RetryUtils;
import org.jets3t.service.ServiceException;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
@ -34,6 +37,8 @@ import java.util.concurrent.Callable;
*/
public class S3Utils
{
private static final Joiner JOINER = Joiner.on("/").skipNulls();
public static void closeStreamsQuietly(S3Object s3Obj)
{
if (s3Obj == null) {
@ -96,4 +101,17 @@ public class S3Utils
return true;
}
public static String constructSegmentPath(String baseKey, DataSegment segment)
{
return JOINER.join(
baseKey.isEmpty() ? null : baseKey,
DataSegmentPusherUtil.getStorageDir(segment)
) + "/index.zip";
}
public static String descriptorPathForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
}
}

View File

@ -0,0 +1,161 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.s3;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.MapUtils;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
import java.util.Set;
public class S3DataSegmentMoverTest
{
private static final DataSegment sourceSegment = new DataSegment(
"test",
new Interval("2013-01-01/2013-01-02"),
"1",
ImmutableMap.<String, Object>of(
"key",
"baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip",
"bucket",
"main"
),
ImmutableList.of("dim1", "dim1"),
ImmutableList.of("metric1", "metric2"),
new NoneShardSpec(),
0,
1
);
@Test
public void testMove() throws Exception
{
MockStorageService mockS3Client = new MockStorageService();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client);
mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"));
mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"));
DataSegment movedSegment = mover.move(
sourceSegment,
ImmutableMap.<String, Object>of("baseKey", "targetBaseKey", "bucket", "archive")
);
Map<String, Object> targetLoadSpec = movedSegment.getLoadSpec();
Assert.assertEquals("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip", MapUtils.getString(targetLoadSpec, "key"));
Assert.assertEquals("archive", MapUtils.getString(targetLoadSpec, "bucket"));
Assert.assertTrue(mockS3Client.didMove());
}
@Test
public void testMoveNoop() throws Exception
{
MockStorageService mockS3Client = new MockStorageService();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client);
mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"));
mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"));
DataSegment movedSegment = mover.move(
sourceSegment,
ImmutableMap.<String, Object>of("baseKey", "targetBaseKey", "bucket", "archive")
);
Map<String, Object> targetLoadSpec = movedSegment.getLoadSpec();
Assert.assertEquals("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip", MapUtils.getString(targetLoadSpec, "key"));
Assert.assertEquals("archive", MapUtils.getString(targetLoadSpec, "bucket"));
Assert.assertFalse(mockS3Client.didMove());
}
@Test(expected = SegmentLoadingException.class)
public void testMoveException() throws Exception
{
MockStorageService mockS3Client = new MockStorageService();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client);
mover.move(
sourceSegment,
ImmutableMap.<String, Object>of("baseKey", "targetBaseKey", "bucket", "archive")
);
}
private class MockStorageService extends RestS3Service {
Map<String, Set<String>> storage = Maps.newHashMap();
boolean moved = false;
private MockStorageService() throws S3ServiceException
{
super(null);
}
public boolean didMove() {
return moved;
}
@Override
public boolean isObjectInBucket(String bucketName, String objectKey) throws ServiceException
{
Set<String> objects = storage.get(bucketName);
return (objects != null && objects.contains(objectKey));
}
@Override
public Map<String, Object> moveObject(
String sourceBucketName,
String sourceObjectKey,
String destinationBucketName,
StorageObject destinationObject,
boolean replaceMetadata
) throws ServiceException
{
moved = true;
if(isObjectInBucket(sourceBucketName, sourceObjectKey)) {
this.putObject(destinationBucketName, new S3Object(destinationObject.getKey()));
storage.get(sourceBucketName).remove(sourceObjectKey);
}
return null;
}
@Override
public S3Object putObject(String bucketName, S3Object object) throws S3ServiceException
{
if (!storage.containsKey(bucketName)) {
storage.put(bucketName, Sets.<String>newHashSet());
}
storage.get(bucketName).add(object.getKey());
return object;
}
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -0,0 +1,57 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import io.druid.timeline.DataSegment;
import java.util.Map;
public class OmniDataSegmentArchiver implements DataSegmentArchiver
{
private final Map<String, DataSegmentArchiver> archivers;
@Inject
public OmniDataSegmentArchiver(
Map<String, DataSegmentArchiver> archivers
)
{
this.archivers = archivers;
}
@Override
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
return getArchiver(segment).archive(segment);
}
private DataSegmentArchiver getArchiver(DataSegment segment) throws SegmentLoadingException
{
String type = MapUtils.getString(segment.getLoadSpec(), "type");
DataSegmentArchiver archiver = archivers.get(type);
if (archiver == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, archivers.keySet());
}
return archiver;
}
}

View File

@ -0,0 +1,57 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import io.druid.timeline.DataSegment;
import java.util.Map;
public class OmniDataSegmentMover implements DataSegmentMover
{
private final Map<String, DataSegmentMover> movers;
@Inject
public OmniDataSegmentMover(
Map<String, DataSegmentMover> movers
)
{
this.movers = movers;
}
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return getMover(segment).move(segment, targetLoadSpec);
}
private DataSegmentMover getMover(DataSegment segment) throws SegmentLoadingException
{
String type = MapUtils.getString(segment.getLoadSpec(), "type");
DataSegmentMover mover = movers.get(type);
if (mover == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, movers.keySet());
}
return mover;
}
}

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.37-SNAPSHOT</version>
<version>0.6.41-SNAPSHOT</version>
</parent>
<dependencies>
@ -68,7 +68,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
@ -89,6 +89,9 @@
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>

View File

@ -53,7 +53,7 @@ import java.util.List;
*/
@Command(
name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.36/Broker.html for a description"
description = "Runs a broker node, see http://druid.io/docs/0.6.40/Broker.html for a description"
)
public class CliBroker extends ServerRunnable
{

View File

@ -60,7 +60,7 @@ import java.util.List;
*/
@Command(
name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.36/Coordinator.html for a description."
description = "Runs the Coordinator, see http://druid.io/docs/0.6.40/Coordinator.html for a description."
)
public class CliCoordinator extends ServerRunnable
{

View File

@ -41,7 +41,7 @@ import java.util.List;
*/
@Command(
name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.36/Batch-ingestion.html for a description."
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.40/Batch-ingestion.html for a description."
)
public class CliHadoopIndexer implements Runnable
{

View File

@ -42,7 +42,7 @@ import java.util.List;
*/
@Command(
name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.36/Historical.html for a description"
description = "Runs a Historical node, see http://druid.io/docs/0.6.40/Historical.html for a description"
)
public class CliHistorical extends ServerRunnable
{

View File

@ -79,9 +79,7 @@ import io.druid.tasklogs.TaskLogStreamer;
import io.druid.tasklogs.TaskLogs;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -95,7 +93,7 @@ import java.util.List;
*/
@Command(
name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.36/Indexing-Service.html for a description"
description = "Runs an Overlord node, see http://druid.io/docs/0.6.40/Indexing-Service.html for a description"
)
public class CliOverlord extends ServerRunnable
{
@ -238,12 +236,12 @@ public class CliOverlord extends ServerRunnable
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler redirect = new ServletContextHandler(ServletContextHandler.SESSIONS);
redirect.setContextPath("/");
redirect.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
final ResourceHandler resourceHandler = new ResourceHandler();
resourceHandler.setBaseResource(
ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class);
root.addServlet(holderPwd, "/");
root.setBaseResource(
new ResourceCollection(
new String[]{
TaskMaster.class.getClassLoader().getResource("static").toExternalForm(),
@ -251,17 +249,17 @@ public class CliOverlord extends ServerRunnable
}
)
);
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GzipFilter.class, "/*", null);
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/");
// Can't use /* here because of Guice and Jetty static content conflicts
root.addFilter(GuiceFilter.class, "/status/*", null);
root.addFilter(GuiceFilter.class, "/druid/*", null);
HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{redirect, resourceHandler, root, new DefaultHandler()});
server.setHandler(handlerList);
handlerList.setHandlers(new Handler[]{root});
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
server.setHandler(handlerList);
}
}
}

View File

@ -60,8 +60,12 @@ import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.OmniDataSegmentArchiver;
import io.druid.segment.loading.OmniDataSegmentKiller;
import io.druid.segment.loading.OmniDataSegmentMover;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.QueryResource;
@ -129,6 +133,10 @@ public class CliPeon extends GuiceRunnable
// Build it to make it bind even if nothing binds to it.
Binders.dataSegmentKillerBinder(binder);
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder);
binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder);
binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(

View File

@ -30,7 +30,7 @@ import java.util.List;
*/
@Command(
name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.36/Realtime.html for a description"
description = "Runs a realtime node, see http://druid.io/docs/0.6.40/Realtime.html for a description"
)
public class CliRealtime extends ServerRunnable
{

View File

@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
*/
@Command(
name = "realtime",
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.36/Realtime.html for a description"
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.40/Realtime.html for a description"
)
public class CliRealtimeExample extends ServerRunnable
{

View File

@ -26,9 +26,7 @@ import io.druid.server.http.RedirectFilter;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -42,22 +40,24 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler redirect = new ServletContextHandler(ServletContextHandler.SESSIONS);
redirect.setContextPath("/");
redirect.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
final ResourceHandler resourceHandler = new ResourceHandler();
resourceHandler.setResourceBase(DruidCoordinator.class.getClassLoader().getResource("static").toExternalForm());
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/");
ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class);
root.addServlet(holderPwd, "/");
root.setResourceBase(DruidCoordinator.class.getClassLoader().getResource("static").toExternalForm());
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GzipFilter.class, "/*", null);
// Can't use '/*' here because of Guice and Jetty static content conflicts
// The coordinator really needs a standarized api path
root.addFilter(GuiceFilter.class, "/status/*", null);
root.addFilter(GuiceFilter.class, "/info/*", null);
root.addFilter(GuiceFilter.class, "/coordinator/*", null);
HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{redirect, resourceHandler, root, new DefaultHandler()});
server.setHandler(handlerList);
handlerList.setHandlers(new Handler[]{root});
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
server.setHandler(handlerList);
}
}