add archive task

This commit is contained in:
Xavier Léauté 2013-12-05 17:37:03 -08:00
parent e38f2877fb
commit a417cd5df2
16 changed files with 451 additions and 66 deletions

View File

@ -30,6 +30,7 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
@ -52,6 +53,7 @@ public class TaskToolbox
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher; private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller; private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView; private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
@ -68,6 +70,7 @@ public class TaskToolbox
ServiceEmitter emitter, ServiceEmitter emitter,
DataSegmentPusher segmentPusher, DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller, DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView, ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@ -84,6 +87,7 @@ public class TaskToolbox
this.emitter = emitter; this.emitter = emitter;
this.segmentPusher = segmentPusher; this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller; this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.segmentAnnouncer = segmentAnnouncer; this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView; this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
@ -119,6 +123,11 @@ public class TaskToolbox
return dataSegmentKiller; return dataSegmentKiller;
} }
public DataSegmentMover getDataSegmentMover()
{
return dataSegmentMover;
}
public DataSegmentAnnouncer getSegmentAnnouncer() public DataSegmentAnnouncer getSegmentAnnouncer()
{ {
return segmentAnnouncer; return segmentAnnouncer;

View File

@ -24,13 +24,13 @@ import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
@ -47,6 +47,7 @@ public class TaskToolboxFactory
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher; private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller; private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView; private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
@ -62,6 +63,7 @@ public class TaskToolboxFactory
ServiceEmitter emitter, ServiceEmitter emitter,
DataSegmentPusher segmentPusher, DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller, DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView, ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@ -76,6 +78,7 @@ public class TaskToolboxFactory
this.emitter = emitter; this.emitter = emitter;
this.segmentPusher = segmentPusher; this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller; this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.segmentAnnouncer = segmentAnnouncer; this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView; this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
@ -96,6 +99,7 @@ public class TaskToolboxFactory
emitter, emitter,
segmentPusher, segmentPusher,
dataSegmentKiller, dataSegmentKiller,
dataSegmentMover,
segmentAnnouncer, segmentAnnouncer,
newSegmentServerView, newSegmentServerView,
queryRunnerFactoryConglomerate, queryRunnerFactoryConglomerate,

View File

@ -0,0 +1,77 @@
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Set;
public class SegmentMoveAction implements TaskAction<Void>
{
@JsonIgnore
private final Set<DataSegment> segments;
@JsonCreator
public SegmentMoveAction(
@JsonProperty("segments") Set<DataSegment> segments
)
{
this.segments = ImmutableSet.copyOf(segments);
}
@JsonProperty
public Set<DataSegment> getSegments()
{
return segments;
}
public TypeReference<Void> getReturnTypeReference()
{
return new TypeReference<Void>() {};
}
@Override
public Void perform(
Task task, TaskActionToolbox toolbox
) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.getIndexerDBCoordinator().moveSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setUser2(task.getDataSource())
.setUser4(task.getType());
for (DataSegment segment : segments) {
metricBuilder.setUser5(segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentMoved/bytes", segment.getSize()));
}
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{
return "SegmentMoveAction{" +
"segments=" + segments +
'}';
}
}

View File

@ -35,7 +35,8 @@ import java.io.IOException;
@JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class), @JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class),
@JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class), @JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class), @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class) @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "segmentMove", value = SegmentMoveAction.class)
}) })
public interface TaskAction<RetType> public interface TaskAction<RetType>
{ {

View File

@ -0,0 +1,110 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentMoveAction;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.List;
public class MoveTask extends AbstractTask
{
private static final Logger log = new Logger(MoveTask.class);
@JsonCreator
public MoveTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
TaskUtils.makeId(id, "move", dataSource, interval),
dataSource,
interval
);
}
@Override
public String getType()
{
return "move";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
if(!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
}
if(!myLock.getInterval().equals(getImplicitLockInterval().get())) {
throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getImplicitLockInterval().get());
}
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
for(final DataSegment unusedSegment : unusedSegments) {
if(unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
throw new ISE(
"WTF?! Unused segment[%s] has version[%s] > task version[%s]",
unusedSegment.getIdentifier(),
unusedSegment.getVersion(),
myLock.getVersion()
);
}
log.info("OK to move segment: %s", unusedSegment.getIdentifier());
}
List<DataSegment> movedSegments = Lists.newLinkedList();
// Move segments
for (DataSegment segment : unusedSegments) {
movedSegments.add(toolbox.getDataSegmentMover().move(segment));
}
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(new SegmentMoveAction(
ImmutableSet.copyOf(movedSegments)
));
return TaskStatus.success(getId());
}
}

View File

@ -45,6 +45,7 @@ import io.druid.query.QueryRunner;
@JsonSubTypes.Type(name = "merge", value = MergeTask.class), @JsonSubTypes.Type(name = "merge", value = MergeTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),

View File

@ -28,8 +28,6 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbConnectorConfig;
import io.druid.db.DbTablesConfig; import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
@ -213,6 +211,24 @@ public class IndexerDBCoordinator
return true; return true;
} }
public void moveSegments(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
for(final DataSegment segment : segments) {
moveSegment(handle, segment);
}
return null;
}
}
);
}
public void deleteSegments(final Set<DataSegment> segments) throws IOException public void deleteSegments(final Set<DataSegment> segments) throws IOException
{ {
dbi.inTransaction( dbi.inTransaction(
@ -235,10 +251,27 @@ public class IndexerDBCoordinator
{ {
handle.createStatement( handle.createStatement(
String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable())
).bind("id", segment.getIdentifier()) )
.bind("id", segment.getIdentifier())
.execute(); .execute();
} }
private void moveSegment(final Handle handle, final DataSegment segment) throws IOException
{
try {
handle.createStatement(
String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable())
)
.bind("id", segment.getIdentifier())
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
}
catch (IOException e) {
log.error(e, "Exception inserting into DB");
throw e;
}
}
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval) public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{ {
List<DataSegment> matchingSegments = dbi.withHandle( List<DataSegment> matchingSegments = dbi.withHandle(

View File

@ -65,6 +65,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPuller;
@ -158,6 +159,14 @@ public class TaskLifecycleTest
} }
}, },
new DataSegmentMover()
{
@Override
public DataSegment move(DataSegment dataSegment) throws SegmentLoadingException
{
return dataSegment;
}
},
null, // segment announcer null, // segment announcer
null, // new segment server view null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective null, // query runner factory conglomerate corporation unionized collective

View File

@ -122,7 +122,7 @@ public class WorkerTaskMonitorTest
new ThreadPoolTaskRunner( new ThreadPoolTaskRunner(
new TaskToolboxFactory( new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0), new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new OmniSegmentLoader( new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of( ImmutableMap.<String, DataSegmentPuller>of(
"local", "local",

View File

@ -38,16 +38,13 @@ public class S3DataSegmentKiller implements DataSegmentKiller
private static final Logger log = new Logger(S3DataSegmentKiller.class); private static final Logger log = new Logger(S3DataSegmentKiller.class);
private final RestS3Service s3Client; private final RestS3Service s3Client;
private final S3DataSegmentKillerConfig config;
@Inject @Inject
public S3DataSegmentKiller( public S3DataSegmentKiller(
RestS3Service s3Client, RestS3Service s3Client
S3DataSegmentKillerConfig config
) )
{ {
this.s3Client = s3Client; this.s3Client = s3Client;
this.config = config;
} }
@Override @Override
@ -59,41 +56,13 @@ public class S3DataSegmentKiller implements DataSegmentKiller
String s3Path = MapUtils.getString(loadSpec, "key"); String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
final String s3ArchiveBucket = config.getArchiveBucket();
if(config.isArchive() && s3ArchiveBucket.isEmpty()) {
log.warn("S3 archive bucket not specified, refusing to delete segment [s3://%s/%s]", s3Bucket, s3Path);
return;
}
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
if (config.isArchive()) { log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
log.info("Archiving index file[s3://%s/%s] to [s3://%s/%s]", s3Client.deleteObject(s3Bucket, s3Path);
s3Bucket,
s3Path,
s3ArchiveBucket,
s3Path
);
s3Client.moveObject(s3Bucket, s3Path, s3ArchiveBucket, new S3Object(s3Path), false);
} else {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
} }
if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
if (config.isArchive()) { log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
log.info( s3Client.deleteObject(s3Bucket, s3DescriptorPath);
"Archiving descriptor file[s3://%s/%s] to [s3://%s/%s]",
s3Bucket,
s3DescriptorPath,
s3ArchiveBucket,
s3DescriptorPath
);
s3Client.moveObject(s3Bucket, s3DescriptorPath, s3ArchiveBucket, new S3Object(s3DescriptorPath), false);
} else {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
} }
} }
catch (ServiceException e) { catch (ServiceException e) {

View File

@ -1,22 +0,0 @@
package io.druid.storage.s3;
import com.fasterxml.jackson.annotation.JsonProperty;
public class S3DataSegmentKillerConfig
{
@JsonProperty
public boolean archive = true;
@JsonProperty
public String archiveBucket = "";
public boolean isArchive()
{
return archive;
}
public String getArchiveBucket()
{
return archiveBucket;
}
}

View File

@ -0,0 +1,99 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.s3;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.util.Map;
public class S3DataSegmentMover implements DataSegmentMover
{
private static final Logger log = new Logger(S3DataSegmentKiller.class);
private final RestS3Service s3Client;
private final S3DataSegmentMoverConfig config;
@Inject
public S3DataSegmentMover(
RestS3Service s3Client,
S3DataSegmentMoverConfig config
)
{
this.s3Client = s3Client;
this.config = config;
}
@Override
public DataSegment move(DataSegment segment) throws SegmentLoadingException
{
try {
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
final String s3ArchiveBucket = config.getArchiveBucket();
if (s3ArchiveBucket.isEmpty()) {
log.warn("S3 archive bucket not specified, refusing to move segment [s3://%s/%s]", s3Bucket, s3Path);
return segment;
}
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
log.info(
"Moving index file[s3://%s/%s] to [s3://%s/%s]",
s3Bucket,
s3Path,
s3ArchiveBucket,
s3Path
);
s3Client.moveObject(s3Bucket, s3Path, s3ArchiveBucket, new S3Object(s3Path), false);
}
if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
log.info(
"Moving descriptor file[s3://%s/%s] to [s3://%s/%s]",
s3Bucket,
s3DescriptorPath,
s3ArchiveBucket,
s3DescriptorPath
);
s3Client.moveObject(s3Bucket, s3DescriptorPath, s3ArchiveBucket, new S3Object(s3DescriptorPath), false);
}
return segment.withLoadSpec(
ImmutableMap.<String, Object>builder()
.putAll(loadSpec)
.put("bucket", s3ArchiveBucket).build()
);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, "Unable to move segment[%s]", segment.getIdentifier());
}
}
}

View File

@ -0,0 +1,33 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.s3;
import com.fasterxml.jackson.annotation.JsonProperty;
public class S3DataSegmentMoverConfig
{
@JsonProperty
public String archiveBucket = "";
public String getArchiveBucket()
{
return archiveBucket;
}
}

View File

@ -51,9 +51,10 @@ public class S3StorageDruidModule implements DruidModule
Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class); Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentKillerConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentMoverConfig.class);
Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class); Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class);

View File

@ -0,0 +1,57 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import io.druid.timeline.DataSegment;
import java.util.Map;
public class OmniDataSegmentMover implements DataSegmentMover
{
private final Map<String, DataSegmentMover> movers;
@Inject
public OmniDataSegmentMover(
Map<String, DataSegmentMover> movers
)
{
this.movers = movers;
}
@Override
public DataSegment move(DataSegment segment) throws SegmentLoadingException
{
return getMover(segment).move(segment);
}
private DataSegmentMover getMover(DataSegment segment) throws SegmentLoadingException
{
String type = MapUtils.getString(segment.getLoadSpec(), "type");
DataSegmentMover mover = movers.get(type);
if (mover == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, movers.keySet());
}
return mover;
}
}

View File

@ -61,7 +61,9 @@ import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.OmniDataSegmentKiller; import io.druid.segment.loading.OmniDataSegmentKiller;
import io.druid.segment.loading.OmniDataSegmentMover;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
@ -129,6 +131,8 @@ public class CliPeon extends GuiceRunnable
// Build it to make it bind even if nothing binds to it. // Build it to make it bind even if nothing binds to it.
Binders.dataSegmentKillerBinder(binder); Binders.dataSegmentKillerBinder(binder);
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class); binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder);
binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance( binder.bind(ExecutorLifecycleConfig.class).toInstance(