diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md index d7e010b3006..13a1a67a741 100644 --- a/docs/content/Configuration.md +++ b/docs/content/Configuration.md @@ -287,6 +287,7 @@ This deep storage is used to interface with Amazon's S3. |`druid.storage.bucket`|S3 bucket name.|none| |`druid.storage.basekey`|S3 base key.|none| |`druid.storage.disableAcl`|Boolean flag for ACL.|false| +|`druid.storage.archiveBucket`|S3 bucket name where segments get archived to when running the indexing service *archive task*|none| #### HDFS Deep Storage diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 0a7a505d4ec..eb34f4e4c1b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -29,7 +29,9 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; @@ -52,6 +54,8 @@ public class TaskToolbox private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; + private final DataSegmentArchiver dataSegmentArchiver; + private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; @@ -68,6 +72,8 @@ public class TaskToolbox ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, + DataSegmentMover dataSegmentMover, + DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @@ -84,6 +90,8 @@ public class TaskToolbox this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; + this.dataSegmentMover = dataSegmentMover; + this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; @@ -119,6 +127,16 @@ public class TaskToolbox return dataSegmentKiller; } + public DataSegmentMover getDataSegmentMover() + { + return dataSegmentMover; + } + + public DataSegmentArchiver getDataSegmentArchiver() + { + return dataSegmentArchiver; + } + public DataSegmentAnnouncer getSegmentAnnouncer() { return segmentAnnouncer; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index ca00dccaf91..d655edc34f0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -24,13 +24,14 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.ServerView; -import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Processing; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.coordination.DataSegmentAnnouncer; @@ -47,6 +48,8 @@ public class TaskToolboxFactory private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; + private final DataSegmentMover dataSegmentMover; + private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; @@ -62,6 +65,8 @@ public class TaskToolboxFactory ServiceEmitter emitter, DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, + DataSegmentMover dataSegmentMover, + DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @@ -76,6 +81,8 @@ public class TaskToolboxFactory this.emitter = emitter; this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; + this.dataSegmentMover = dataSegmentMover; + this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; @@ -96,6 +103,8 @@ public class TaskToolboxFactory emitter, segmentPusher, dataSegmentKiller, + dataSegmentMover, + dataSegmentArchiver, segmentAnnouncer, newSegmentServerView, queryRunnerFactoryConglomerate, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java new file mode 100644 index 00000000000..f996a2c6ab0 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java @@ -0,0 +1,77 @@ +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSet; +import com.metamx.common.ISE; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.indexing.common.task.Task; +import io.druid.timeline.DataSegment; + +import java.io.IOException; +import java.util.Set; + +public class SegmentMetadataUpdateAction implements TaskAction +{ + @JsonIgnore + private final Set segments; + + @JsonCreator + public SegmentMetadataUpdateAction( + @JsonProperty("segments") Set segments + ) + { + this.segments = ImmutableSet.copyOf(segments); + } + + @JsonProperty + public Set getSegments() + { + return segments; + } + + public TypeReference getReturnTypeReference() + { + return new TypeReference() {}; + } + + @Override + public Void perform( + Task task, TaskActionToolbox toolbox + ) throws IOException + { + if(!toolbox.taskLockCoversSegments(task, segments, true)) { + throw new ISE("Segments not covered by locks for task: %s", task.getId()); + } + + toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments); + + // Emit metrics + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() + .setUser2(task.getDataSource()) + .setUser4(task.getType()); + + for (DataSegment segment : segments) { + metricBuilder.setUser5(segment.getInterval().toString()); + toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentMoved/bytes", segment.getSize())); + } + + return null; + } + + @Override + public boolean isAudited() + { + return true; + } + + @Override + public String toString() + { + return "SegmentMetadataUpdateAction{" + + "segments=" + segments + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java index d9bdfe5b694..038d06be3c6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java @@ -35,7 +35,8 @@ import java.io.IOException; @JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class), @JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class), @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class), - @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class) + @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), + @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class) }) public interface TaskAction { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java new file mode 100644 index 00000000000..97747e211ab --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java @@ -0,0 +1,106 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUnusedAction; +import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; + +public class ArchiveTask extends AbstractFixedIntervalTask +{ + private static final Logger log = new Logger(ArchiveTask.class); + + public ArchiveTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + super(id, dataSource, interval); + } + + @Override + public String getType() + { + return "archive"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + // Confirm we have a lock (will throw if there isn't exactly one element) + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + + if (!myLock.getDataSource().equals(getDataSource())) { + throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); + } + + if (!myLock.getInterval().equals(getInterval())) { + throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval()); + } + + // List unused segments + final List unusedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); + + // Verify none of these segments have versions > lock version + for (final DataSegment unusedSegment : unusedSegments) { + if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) { + throw new ISE( + "WTF?! Unused segment[%s] has version[%s] > task version[%s]", + unusedSegment.getIdentifier(), + unusedSegment.getVersion(), + myLock.getVersion() + ); + } + + log.info("OK to archive segment: %s", unusedSegment.getIdentifier()); + } + + List archivedSegments = Lists.newLinkedList(); + + // Move segments + for (DataSegment segment : unusedSegments) { + archivedSegments.add(toolbox.getDataSegmentArchiver().archive(segment)); + } + + // Update metadata for moved segments + toolbox.getTaskActionClient().submit( + new SegmentMetadataUpdateAction( + ImmutableSet.copyOf(archivedSegments) + ) + ); + + return TaskStatus.success(getId()); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java new file mode 100644 index 00000000000..371f0dc38a4 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java @@ -0,0 +1,115 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUnusedAction; +import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Map; + +public class MoveTask extends AbstractFixedIntervalTask +{ + private static final Logger log = new Logger(MoveTask.class); + + private final Map targetLoadSpec; + + @JsonCreator + public MoveTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("target") Map targetLoadSpec + ) + { + super( + TaskUtils.makeId(id, "move", dataSource, interval), + dataSource, + interval + ); + this.targetLoadSpec = targetLoadSpec; + } + + @Override + public String getType() + { + return "move"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + // Confirm we have a lock (will throw if there isn't exactly one element) + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + + if(!myLock.getDataSource().equals(getDataSource())) { + throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); + } + + if(!myLock.getInterval().equals(getInterval())) { + throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval()); + } + + // List unused segments + final List unusedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); + + // Verify none of these segments have versions > lock version + for(final DataSegment unusedSegment : unusedSegments) { + if(unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) { + throw new ISE( + "WTF?! Unused segment[%s] has version[%s] > task version[%s]", + unusedSegment.getIdentifier(), + unusedSegment.getVersion(), + myLock.getVersion() + ); + } + + log.info("OK to move segment: %s", unusedSegment.getIdentifier()); + } + + List movedSegments = Lists.newLinkedList(); + + // Move segments + for (DataSegment segment : unusedSegments) { + movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec)); + } + + // Update metadata for moved segments + toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction( + ImmutableSet.copyOf(movedSegments) + )); + + return TaskStatus.success(getId()); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 4d6afd2ebf6..8fa4b53bf10 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -45,6 +45,8 @@ import io.druid.query.QueryRunner; @JsonSubTypes.Type(name = "merge", value = MergeTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class), @JsonSubTypes.Type(name = "kill", value = KillTask.class), + @JsonSubTypes.Type(name = "move", value = MoveTask.class), + @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java index 2a4b5e8912d..f9db89b3fc9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/IndexerDBCoordinator.java @@ -28,8 +28,6 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.logger.Logger; -import io.druid.db.DbConnector; -import io.druid.db.DbConnectorConfig; import io.druid.db.DbTablesConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; @@ -213,6 +211,24 @@ public class IndexerDBCoordinator return true; } + public void updateSegmentMetadata(final Set segments) throws IOException + { + dbi.inTransaction( + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + for(final DataSegment segment : segments) { + updatePayload(handle, segment); + } + + return null; + } + } + ); + } + public void deleteSegments(final Set segments) throws IOException { dbi.inTransaction( @@ -235,10 +251,27 @@ public class IndexerDBCoordinator { handle.createStatement( String.format("DELETE from %s WHERE id = :id", dbTables.getSegmentsTable()) - ).bind("id", segment.getIdentifier()) + ) + .bind("id", segment.getIdentifier()) .execute(); } + private void updatePayload(final Handle handle, final DataSegment segment) throws IOException + { + try { + handle.createStatement( + String.format("UPDATE %s SET payload = :payload WHERE id = :id", dbTables.getSegmentsTable()) + ) + .bind("id", segment.getIdentifier()) + .bind("payload", jsonMapper.writeValueAsString(segment)) + .execute(); + } + catch (IOException e) { + log.error(e, "Exception inserting into DB"); + throw e; + } + } + public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) { List matchingSegments = dbi.withHandle( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 632384ceb3d..a873daa876f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -64,7 +64,9 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPuller; @@ -88,6 +90,7 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; public class TaskLifecycleTest @@ -158,6 +161,22 @@ public class TaskLifecycleTest } }, + new DataSegmentMover() + { + @Override + public DataSegment move(DataSegment dataSegment, Map targetLoadSpec) throws SegmentLoadingException + { + return dataSegment; + } + }, + new DataSegmentArchiver() + { + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + return segment; + } + }, null, // segment announcer null, // new segment server view null, // query runner factory conglomerate corporation unionized collective diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index f80ca3cd8db..8d4bf32b870 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -122,7 +122,7 @@ public class WorkerTaskMonitorTest new ThreadPoolTaskRunner( new TaskToolboxFactory( new TaskConfig(tmp.toString(), null, null, 0), - null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( + null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( new OmniSegmentLoader( ImmutableMap.of( "local", @@ -209,4 +209,4 @@ public class WorkerTaskMonitorTest Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId()); Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode()); } -} \ No newline at end of file +} diff --git a/pom.xml b/pom.xml index f970d49dea5..f55d5202e2d 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ UTF-8 0.25.1 2.1.0-incubating - 0.1.6 + 0.1.7 diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java new file mode 100644 index 00000000000..b7c04ec0c00 --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java @@ -0,0 +1,59 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import io.druid.segment.loading.DataSegmentArchiver; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; + + +public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSegmentArchiver +{ + private final S3DataSegmentArchiverConfig config; + + @Inject + public S3DataSegmentArchiver( + RestS3Service s3Client, + S3DataSegmentArchiverConfig config + ) + { + super(s3Client); + this.config = config; + } + + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + String targetS3Bucket = config.getArchiveBucket(); + String targetS3BaseKey = config.getArchiveBaseKey(); + + return move( + segment, + ImmutableMap.of( + "bucket", targetS3Bucket, + "baseKey", targetS3BaseKey + ) + ); + } +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java new file mode 100644 index 00000000000..5eb33eb1b5d --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java @@ -0,0 +1,41 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class S3DataSegmentArchiverConfig +{ + @JsonProperty + public String archiveBucket = ""; + + @JsonProperty + public String archiveBaseKey = ""; + + public String getArchiveBucket() + { + return archiveBucket; + } + + public String getArchiveBaseKey() + { + return archiveBaseKey; + } +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java index b9a44f631c6..0e4fde44d76 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java @@ -53,7 +53,7 @@ public class S3DataSegmentKiller implements DataSegmentKiller Map loadSpec = segment.getLoadSpec(); String s3Bucket = MapUtils.getString(loadSpec, "bucket"); String s3Path = MapUtils.getString(loadSpec, "key"); - String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java new file mode 100644 index 00000000000..a9baf1d40ab --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -0,0 +1,130 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; + +import java.util.Map; + +public class S3DataSegmentMover implements DataSegmentMover +{ + private static final Logger log = new Logger(S3DataSegmentMover.class); + + private final RestS3Service s3Client; + + @Inject + public S3DataSegmentMover( + RestS3Service s3Client + ) + { + this.s3Client = s3Client; + } + + @Override + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException + { + try { + Map loadSpec = segment.getLoadSpec(); + String s3Bucket = MapUtils.getString(loadSpec, "bucket"); + String s3Path = MapUtils.getString(loadSpec, "key"); + String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); + + final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket"); + final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey"); + + final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, segment); + String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path); + + if (targetS3Bucket.isEmpty()) { + throw new SegmentLoadingException("Target S3 bucket is not specified"); + } + if (targetS3Path.isEmpty()) { + throw new SegmentLoadingException("Target S3 baseKey is not specified"); + } + + safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path); + safeMove(s3Bucket, s3DescriptorPath, targetS3Bucket, targetS3DescriptorPath); + + return segment.withLoadSpec( + ImmutableMap.builder() + .putAll( + Maps.filterKeys( + loadSpec, new Predicate() + { + @Override + public boolean apply(String input) + { + return !(input.equals("bucket") || input.equals("key")); + } + } + ) + ) + .put("bucket", targetS3Bucket) + .put("key", targetS3Path) + .build() + ); + } + catch (ServiceException e) { + throw new SegmentLoadingException(e, "Unable to move segment[%s]", segment.getIdentifier()); + } + } + + private void safeMove(String s3Bucket, String s3Path, String targetS3Bucket, String targetS3Path) + throws ServiceException, SegmentLoadingException + { + if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + log.info( + "Moving file[s3://%s/%s] to [s3://%s/%s]", + s3Bucket, + s3Path, + targetS3Bucket, + targetS3Path + ); + s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, new S3Object(targetS3Path), false); + } else { + // ensure object exists in target location + if(s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) { + log.info( + "Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]", + s3Bucket, s3Path, + targetS3Bucket, targetS3Path + ); + } + else { + throw new SegmentLoadingException( + "Unable to move file [s3://%s/%s] to [s3://%s/%s], not present in either source or target location", + s3Bucket, s3Path, + targetS3Bucket, targetS3Path + ); + } + } + } +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index e8b1a99710f..664c270799b 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -20,7 +20,6 @@ package io.druid.storage.s3; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; @@ -29,7 +28,6 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.utils.CompressionUtils; import org.jets3t.service.ServiceException; @@ -45,7 +43,6 @@ import java.util.concurrent.Callable; public class S3DataSegmentPusher implements DataSegmentPusher { private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class); - private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final RestS3Service s3Client; private final S3DataSegmentPusherConfig config; @@ -73,10 +70,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { log.info("Uploading [%s] to S3", indexFilesDir); - final String outputKey = JOINER.join( - config.getBaseKey().isEmpty() ? null : config.getBaseKey(), - DataSegmentPusherUtil.getStorageDir(inSegment) - ); + final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), inSegment); final File zipOutFile = File.createTempFile("druid", "index.zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile); @@ -90,8 +84,10 @@ public class S3DataSegmentPusher implements DataSegmentPusher S3Object toPush = new S3Object(zipOutFile); final String outputBucket = config.getBucket(); + final String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); + toPush.setBucketName(outputBucket); - toPush.setKey(outputKey + "/index.zip"); + toPush.setKey(s3Path); if (!config.getDisableAcl()) { toPush.setAcl(AccessControlList.REST_CANNED_AUTHENTICATED_READ); } @@ -116,7 +112,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile); S3Object descriptorObject = new S3Object(descriptorFile); descriptorObject.setBucketName(outputBucket); - descriptorObject.setKey(outputKey + "/descriptor.json"); + descriptorObject.setKey(s3DescriptorPath); if (!config.getDisableAcl()) { descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); } @@ -142,4 +138,4 @@ public class S3DataSegmentPusher implements DataSegmentPusher throw Throwables.propagate(e); } } -} \ No newline at end of file +} diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index f2675251f19..d30f49f976a 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -51,8 +51,11 @@ public class S3StorageDruidModule implements DruidModule Binders.dataSegmentPullerBinder(binder).addBinding("s3_zip").to(S3DataSegmentPuller.class).in(LazySingleton.class); Binders.dataSegmentKillerBinder(binder).addBinding("s3_zip").to(S3DataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder).addBinding("s3_zip").to(S3DataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentArchiverBinder(binder).addBinding("s3_zip").to(S3DataSegmentArchiver.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); + JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class); diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 3ae7088d88f..6cf481fa2f9 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -19,9 +19,12 @@ package io.druid.storage.s3; +import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.metamx.common.RetryUtils; import org.jets3t.service.ServiceException; +import io.druid.segment.loading.DataSegmentPusherUtil; +import io.druid.timeline.DataSegment; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; @@ -34,6 +37,8 @@ import java.util.concurrent.Callable; */ public class S3Utils { + private static final Joiner JOINER = Joiner.on("/").skipNulls(); + public static void closeStreamsQuietly(S3Object s3Obj) { if (s3Obj == null) { @@ -96,4 +101,17 @@ public class S3Utils return true; } + + public static String constructSegmentPath(String baseKey, DataSegment segment) + { + return JOINER.join( + baseKey.isEmpty() ? null : baseKey, + DataSegmentPusherUtil.getStorageDir(segment) + ) + "/index.zip"; + } + + public static String descriptorPathForSegmentPath(String s3Path) + { + return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + } } diff --git a/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java b/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java new file mode 100644 index 00000000000..6206da881a4 --- /dev/null +++ b/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java @@ -0,0 +1,161 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.s3; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.metamx.common.MapUtils; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.S3Object; +import org.jets3t.service.model.StorageObject; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +public class S3DataSegmentMoverTest +{ + private static final DataSegment sourceSegment = new DataSegment( + "test", + new Interval("2013-01-01/2013-01-02"), + "1", + ImmutableMap.of( + "key", + "baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip", + "bucket", + "main" + ), + ImmutableList.of("dim1", "dim1"), + ImmutableList.of("metric1", "metric2"), + new NoneShardSpec(), + 0, + 1 + ); + + @Test + public void testMove() throws Exception + { + MockStorageService mockS3Client = new MockStorageService(); + S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); + + mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip")); + mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json")); + + DataSegment movedSegment = mover.move( + sourceSegment, + ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive") + ); + + Map targetLoadSpec = movedSegment.getLoadSpec(); + Assert.assertEquals("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip", MapUtils.getString(targetLoadSpec, "key")); + Assert.assertEquals("archive", MapUtils.getString(targetLoadSpec, "bucket")); + Assert.assertTrue(mockS3Client.didMove()); + } + + @Test + public void testMoveNoop() throws Exception + { + MockStorageService mockS3Client = new MockStorageService(); + S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); + + mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip")); + mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json")); + + DataSegment movedSegment = mover.move( + sourceSegment, + ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive") + ); + + Map targetLoadSpec = movedSegment.getLoadSpec(); + + Assert.assertEquals("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip", MapUtils.getString(targetLoadSpec, "key")); + Assert.assertEquals("archive", MapUtils.getString(targetLoadSpec, "bucket")); + Assert.assertFalse(mockS3Client.didMove()); + } + + @Test(expected = SegmentLoadingException.class) + public void testMoveException() throws Exception + { + MockStorageService mockS3Client = new MockStorageService(); + S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client); + + mover.move( + sourceSegment, + ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive") + ); + } + + private class MockStorageService extends RestS3Service { + Map> storage = Maps.newHashMap(); + boolean moved = false; + + private MockStorageService() throws S3ServiceException + { + super(null); + } + + public boolean didMove() { + return moved; + } + + @Override + public boolean isObjectInBucket(String bucketName, String objectKey) throws ServiceException + { + Set objects = storage.get(bucketName); + return (objects != null && objects.contains(objectKey)); + } + + @Override + public Map moveObject( + String sourceBucketName, + String sourceObjectKey, + String destinationBucketName, + StorageObject destinationObject, + boolean replaceMetadata + ) throws ServiceException + { + moved = true; + if(isObjectInBucket(sourceBucketName, sourceObjectKey)) { + this.putObject(destinationBucketName, new S3Object(destinationObject.getKey())); + storage.get(sourceBucketName).remove(sourceObjectKey); + } + return null; + } + + @Override + public S3Object putObject(String bucketName, S3Object object) throws S3ServiceException + { + if (!storage.containsKey(bucketName)) { + storage.put(bucketName, Sets.newHashSet()); + } + storage.get(bucketName).add(object.getKey()); + return object; + } + } +} diff --git a/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java new file mode 100644 index 00000000000..bf34bbe17bb --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import io.druid.timeline.DataSegment; + +import java.util.Map; + +public class OmniDataSegmentArchiver implements DataSegmentArchiver +{ + private final Map archivers; + + @Inject + public OmniDataSegmentArchiver( + Map archivers + ) + { + this.archivers = archivers; + } + + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + return getArchiver(segment).archive(segment); + } + + private DataSegmentArchiver getArchiver(DataSegment segment) throws SegmentLoadingException + { + String type = MapUtils.getString(segment.getLoadSpec(), "type"); + DataSegmentArchiver archiver = archivers.get(type); + + if (archiver == null) { + throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, archivers.keySet()); + } + + return archiver; + } +} diff --git a/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java new file mode 100644 index 00000000000..d585b0b7db9 --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import io.druid.timeline.DataSegment; + +import java.util.Map; + +public class OmniDataSegmentMover implements DataSegmentMover +{ + private final Map movers; + + @Inject + public OmniDataSegmentMover( + Map movers + ) + { + this.movers = movers; + } + + @Override + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException + { + return getMover(segment).move(segment, targetLoadSpec); + } + + private DataSegmentMover getMover(DataSegment segment) throws SegmentLoadingException + { + String type = MapUtils.getString(segment.getLoadSpec(), "type"); + DataSegmentMover mover = movers.get(type); + + if (mover == null) { + throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, movers.keySet()); + } + + return mover; + } +} diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index e8a5b985bd4..7204e5fc63a 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -60,8 +60,12 @@ import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.query.QuerySegmentWalker; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.OmniDataSegmentArchiver; import io.druid.segment.loading.OmniDataSegmentKiller; +import io.druid.segment.loading.OmniDataSegmentMover; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.QueryResource; @@ -129,6 +133,10 @@ public class CliPeon extends GuiceRunnable // Build it to make it bind even if nothing binds to it. Binders.dataSegmentKillerBinder(binder); binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder); + binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentArchiverBinder(binder); + binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class); binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); binder.bind(ExecutorLifecycleConfig.class).toInstance(