diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 0a7a505d4ec..56f08f3c9f3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -30,6 +30,7 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; @@ -52,6 +53,7 @@ public class TaskToolbox private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; + private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; @@ -68,6 +70,7 @@ public class TaskToolbox ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, + DataSegmentMover dataSegmentMover, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @@ -84,6 +87,7 @@ public class TaskToolbox this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; + this.dataSegmentMover = dataSegmentMover; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; @@ -119,6 +123,11 @@ public class TaskToolbox return dataSegmentKiller; } + public DataSegmentMover getDataSegmentMover() + { + return dataSegmentMover; + } + public DataSegmentAnnouncer getSegmentAnnouncer() { return segmentAnnouncer; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index ca00dccaf91..cf9614c4bd6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -24,13 +24,13 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.ServerView; -import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Processing; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.coordination.DataSegmentAnnouncer; @@ -47,6 +47,7 @@ public class TaskToolboxFactory private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; + private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; @@ -62,6 +63,7 @@ public class TaskToolboxFactory ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, + DataSegmentMover dataSegmentMover, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @@ -76,6 +78,7 @@ public class TaskToolboxFactory this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; + this.dataSegmentMover = dataSegmentMover; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; @@ -96,6 +99,7 @@ public class TaskToolboxFactory emitter, segmentPusher, dataSegmentKiller, + dataSegmentMover, segmentAnnouncer, newSegmentServerView, queryRunnerFactoryConglomerate, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMoveAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMoveAction.java new file mode 100644 index 00000000000..67db4fc79be --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMoveAction.java @@ -0,0 +1,77 @@ +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import com.metamx.common.ISE; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.indexing.common.task.Task; +import io.druid.timeline.DataSegment; + +import java.io.IOException; +import java.util.Set; + +public class SegmentMoveAction implements TaskAction +{ + @JsonIgnore + private final Set segments; + + @JsonCreator + public SegmentMoveAction( + @JsonProperty("segments") Set segments + ) + { + this.segments = ImmutableSet.copyOf(segments); + } + + @JsonProperty + public Set getSegments() + { + return segments; + } + + public TypeReference getReturnTypeReference() + { + return new TypeReference() {}; + } + + @Override + public Void perform( + Task task, TaskActionToolbox toolbox + ) throws IOException + { + if(!toolbox.taskLockCoversSegments(task, segments, true)) { + throw new ISE("Segments not covered by locks for task: %s", task.getId()); + } + + toolbox.getIndexerDBCoordinator().moveSegments(segments); + + // Emit metrics + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() + .setUser2(task.getDataSource()) + .setUser4(task.getType()); + + for (DataSegment segment : segments) { + metricBuilder.setUser5(segment.getInterval().toString()); + toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentMoved/bytes", segment.getSize())); + } + + return null; + } + + @Override + public boolean isAudited() + { + return true; + } + + @Override + public String toString() + { + return "SegmentMoveAction{" + + "segments=" + segments + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java index d9bdfe5b694..1d59e40c6c3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java @@ -35,7 +35,8 @@ import java.io.IOException; @JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class), @JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class), @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class), - @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class) + @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), + @JsonSubTypes.Type(name = "segmentMove", value = SegmentMoveAction.class) }) public interface TaskAction { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java new file mode 100644 index 00000000000..4ca600f8ea5 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java @@ -0,0 +1,110 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUnusedAction; +import io.druid.indexing.common.actions.SegmentMoveAction; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; + +public class MoveTask extends AbstractTask +{ + private static final Logger log = new Logger(MoveTask.class); + + @JsonCreator + public MoveTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + super( + TaskUtils.makeId(id, "move", dataSource, interval), + dataSource, + interval + ); + } + + @Override + public String getType() + { + return "move"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + // Confirm we have a lock (will throw if there isn't exactly one element) + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + + if(!myLock.getDataSource().equals(getDataSource())) { + throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); + } + + if(!myLock.getInterval().equals(getImplicitLockInterval().get())) { + throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getImplicitLockInterval().get()); + } + + // List unused segments + final List unusedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); + + // Verify none of these segments have versions > lock version + for(final DataSegment unusedSegment : unusedSegments) { + if(unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) { + throw new ISE( + "WTF?! Unused segment[%s] has version[%s] > task version[%s]", + unusedSegment.getIdentifier(), + unusedSegment.getVersion(), + myLock.getVersion() + ); + } + + log.info("OK to move segment: %s", unusedSegment.getIdentifier()); + } + + List movedSegments = Lists.newLinkedList(); + + // Move segments + for (DataSegment segment : unusedSegments) { + movedSegments.add(toolbox.getDataSegmentMover().move(segment)); + } + + // Update metadata for moved segments + toolbox.getTaskActionClient().submit(new SegmentMoveAction( + ImmutableSet.copyOf(movedSegments) + )); + + return TaskStatus.success(getId()); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 4d6afd2ebf6..32747bdc7d3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -45,6 +45,7 @@ import io.druid.query.QueryRunner; @JsonSubTypes.Type(name = "merge", value = MergeTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class), @JsonSubTypes.Type(name = "kill", value = KillTask.class), + @JsonSubTypes.Type(name = "move", value = MoveTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java index 2a4b5e8912d..943e63fa821 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java @@ -28,8 +28,6 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.logger.Logger; -import io.druid.db.DbConnector; -import io.druid.db.DbConnectorConfig; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; @@ -213,6 +211,24 @@ public class IndexerDBCoordinator return true; } + public void moveSegments(final Set segments) throws IOException + { + dbi.inTransaction( + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + for(final DataSegment segment : segments) { + moveSegment(handle, segment); + } + + return null; + } + } + ); + } + public void deleteSegments(final Set segments) throws IOException { dbi.inTransaction( @@ -235,10 +251,27 @@ public class IndexerDBCoordinator { handle.createStatement( String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) - ).bind("id", segment.getIdentifier()) + ) + .bind("id", segment.getIdentifier()) .execute(); } + private void moveSegment(final Handle handle, final DataSegment segment) throws IOException + { + try { + handle.createStatement( + String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) + ) + .bind("id", segment.getIdentifier()) + .bind("payload", jsonMapper.writeValueAsString(segment)) + .execute(); + } + catch (IOException e) { + log.error(e, "Exception inserting into DB"); + throw e; + } + } + public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) { List matchingSegments = dbi.withHandle( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 632384ceb3d..fcbfed7589b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -65,6 +65,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPuller; @@ -158,6 +159,14 @@ public class TaskLifecycleTest } }, + new DataSegmentMover() + { + @Override + public DataSegment move(DataSegment dataSegment) throws SegmentLoadingException + { + return dataSegment; + } + }, null, // segment announcer null, // new segment server view null, // query runner factory conglomerate corporation unionized collective diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index f80ca3cd8db..7723013d239 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -122,7 +122,7 @@ public class WorkerTaskMonitorTest new ThreadPoolTaskRunner( new TaskToolboxFactory( new TaskConfig(tmp.toString(), null, null, 0), - null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( + null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( new OmniSegmentLoader( ImmutableMap.of( "local", @@ -209,4 +209,4 @@ public class WorkerTaskMonitorTest Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId()); Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode()); } -} \ No newline at end of file +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java index 5409568a917..4e295d17aa4 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java @@ -38,16 +38,13 @@ public class S3DataSegmentKiller implements DataSegmentKiller private static final Logger log = new Logger(S3DataSegmentKiller.class); private final RestS3Service s3Client; - private final S3DataSegmentKillerConfig config; @Inject public S3DataSegmentKiller( - RestS3Service s3Client, - S3DataSegmentKillerConfig config + RestS3Service s3Client ) { this.s3Client = s3Client; - this.config = config; } @Override @@ -59,41 +56,13 @@ public class S3DataSegmentKiller implements DataSegmentKiller String s3Path = MapUtils.getString(loadSpec, "key"); String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; - final String s3ArchiveBucket = config.getArchiveBucket(); - - if(config.isArchive() && s3ArchiveBucket.isEmpty()) { - log.warn("S3 archive bucket not specified, refusing to delete segment [s3://%s/%s]", s3Bucket, s3Path); - return; - } - if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { - if (config.isArchive()) { - log.info("Archiving index file[s3://%s/%s] to [s3://%s/%s]", - s3Bucket, - s3Path, - s3ArchiveBucket, - s3Path - ); - s3Client.moveObject(s3Bucket, s3Path, s3ArchiveBucket, new S3Object(s3Path), false); - } else { - log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); - s3Client.deleteObject(s3Bucket, s3Path); - } + log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); + s3Client.deleteObject(s3Bucket, s3Path); } if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { - if (config.isArchive()) { - log.info( - "Archiving descriptor file[s3://%s/%s] to [s3://%s/%s]", - s3Bucket, - s3DescriptorPath, - s3ArchiveBucket, - s3DescriptorPath - ); - s3Client.moveObject(s3Bucket, s3DescriptorPath, s3ArchiveBucket, new S3Object(s3DescriptorPath), false); - } else { - log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); - s3Client.deleteObject(s3Bucket, s3DescriptorPath); - } + log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); + s3Client.deleteObject(s3Bucket, s3DescriptorPath); } } catch (ServiceException e) { diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKillerConfig.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKillerConfig.java deleted file mode 100644 index bd169b05f2b..00000000000 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKillerConfig.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.druid.storage.s3; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class S3DataSegmentKillerConfig -{ - @JsonProperty - public boolean archive = true; - - @JsonProperty - public String archiveBucket = ""; - - public boolean isArchive() - { - return archive; - } - - public String getArchiveBucket() - { - return archiveBucket; - } -} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java new file mode 100644 index 00000000000..245cbab8a63 --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -0,0 +1,99 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; + +import java.util.Map; + +public class S3DataSegmentMover implements DataSegmentMover +{ + private static final Logger log = new Logger(S3DataSegmentKiller.class); + + private final RestS3Service s3Client; + private final S3DataSegmentMoverConfig config; + + @Inject + public S3DataSegmentMover( + RestS3Service s3Client, + S3DataSegmentMoverConfig config + ) + { + this.s3Client = s3Client; + this.config = config; + } + + @Override + public DataSegment move(DataSegment segment) throws SegmentLoadingException + { + try { + Map loadSpec = segment.getLoadSpec(); + String s3Bucket = MapUtils.getString(loadSpec, "bucket"); + String s3Path = MapUtils.getString(loadSpec, "key"); + String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + + final String s3ArchiveBucket = config.getArchiveBucket(); + + if (s3ArchiveBucket.isEmpty()) { + log.warn("S3 archive bucket not specified, refusing to move segment [s3://%s/%s]", s3Bucket, s3Path); + return segment; + } + + if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + log.info( + "Moving index file[s3://%s/%s] to [s3://%s/%s]", + s3Bucket, + s3Path, + s3ArchiveBucket, + s3Path + ); + s3Client.moveObject(s3Bucket, s3Path, s3ArchiveBucket, new S3Object(s3Path), false); + } + if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { + log.info( + "Moving descriptor file[s3://%s/%s] to [s3://%s/%s]", + s3Bucket, + s3DescriptorPath, + s3ArchiveBucket, + s3DescriptorPath + ); + s3Client.moveObject(s3Bucket, s3DescriptorPath, s3ArchiveBucket, new S3Object(s3DescriptorPath), false); + } + + return segment.withLoadSpec( + ImmutableMap.builder() + .putAll(loadSpec) + .put("bucket", s3ArchiveBucket).build() + ); + } + catch (ServiceException e) { + throw new SegmentLoadingException(e, "Unable to move segment[%s]", segment.getIdentifier()); + } + } +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMoverConfig.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMoverConfig.java new file mode 100644 index 00000000000..a298ba9da39 --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMoverConfig.java @@ -0,0 +1,33 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class S3DataSegmentMoverConfig +{ + @JsonProperty + public String archiveBucket = ""; + + public String getArchiveBucket() + { + return archiveBucket; + } +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index 6bb4624a3d3..fadba584bce 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -51,9 +51,10 @@ public class S3StorageDruidModule implements DruidModule Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); - JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentKillerConfig.class); + JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentMoverConfig.class); Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class); diff --git a/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java new file mode 100644 index 00000000000..490c936011d --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import io.druid.timeline.DataSegment; + +import java.util.Map; + +public class OmniDataSegmentMover implements DataSegmentMover +{ + private final Map movers; + + @Inject + public OmniDataSegmentMover( + Map movers + ) + { + this.movers = movers; + } + + @Override + public DataSegment move(DataSegment segment) throws SegmentLoadingException + { + return getMover(segment).move(segment); + } + + private DataSegmentMover getMover(DataSegment segment) throws SegmentLoadingException + { + String type = MapUtils.getString(segment.getLoadSpec(), "type"); + DataSegmentMover mover = movers.get(type); + + if (mover == null) { + throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, movers.keySet()); + } + + return mover; + } +} diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index e8a5b985bd4..450c9286554 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -61,7 +61,9 @@ import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.query.QuerySegmentWalker; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.OmniDataSegmentKiller; +import io.druid.segment.loading.OmniDataSegmentMover; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.QueryResource; @@ -129,6 +131,8 @@ public class CliPeon extends GuiceRunnable // Build it to make it bind even if nothing binds to it. Binders.dataSegmentKillerBinder(binder); binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder); + binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class); binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); binder.bind(ExecutorLifecycleConfig.class).toInstance(