diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 56f08f3c9f3..eb34f4e4c1b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -29,6 +29,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; @@ -53,6 +54,7 @@ public class TaskToolbox private final ServiceEmitter emitter; private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; + private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; @@ -71,6 +73,7 @@ public class TaskToolbox DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, DataSegmentMover dataSegmentMover, + DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @@ -88,6 +91,7 @@ public class TaskToolbox this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; this.dataSegmentMover = dataSegmentMover; + this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; @@ -128,6 +132,11 @@ public class TaskToolbox return dataSegmentMover; } + public DataSegmentArchiver getDataSegmentArchiver() + { + return dataSegmentArchiver; + } + public DataSegmentAnnouncer getSegmentAnnouncer() { return segmentAnnouncer; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index cf9614c4bd6..d655edc34f0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -29,6 +29,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; @@ -48,6 +49,7 @@ public class TaskToolboxFactory private final DataSegmentPusher segmentPusher; private final DataSegmentKiller dataSegmentKiller; private final DataSegmentMover dataSegmentMover; + private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentAnnouncer segmentAnnouncer; private final ServerView newSegmentServerView; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; @@ -64,6 +66,7 @@ public class TaskToolboxFactory DataSegmentPusher segmentPusher, DataSegmentKiller dataSegmentKiller, DataSegmentMover dataSegmentMover, + DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, ServerView newSegmentServerView, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @@ -79,6 +82,7 @@ public class TaskToolboxFactory this.segmentPusher = segmentPusher; this.dataSegmentKiller = dataSegmentKiller; this.dataSegmentMover = dataSegmentMover; + this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; this.newSegmentServerView = newSegmentServerView; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; @@ -100,6 +104,7 @@ public class TaskToolboxFactory segmentPusher, dataSegmentKiller, dataSegmentMover, + dataSegmentArchiver, segmentAnnouncer, newSegmentServerView, queryRunnerFactoryConglomerate, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java new file mode 100644 index 00000000000..43f8fd60e18 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java @@ -0,0 +1,106 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUnusedAction; +import io.druid.indexing.common.actions.SegmentMoveAction; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; + +public class ArchiveTask extends AbstractFixedIntervalTask +{ + private static final Logger log = new Logger(ArchiveTask.class); + + public ArchiveTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + super(id, dataSource, interval); + } + + @Override + public String getType() + { + return "archive"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + // Confirm we have a lock (will throw if there isn't exactly one element) + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + + if (!myLock.getDataSource().equals(getDataSource())) { + throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); + } + + if (!myLock.getInterval().equals(getInterval())) { + throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval()); + } + + // List unused segments + final List unusedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); + + // Verify none of these segments have versions > lock version + for (final DataSegment unusedSegment : unusedSegments) { + if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) { + throw new ISE( + "WTF?! Unused segment[%s] has version[%s] > task version[%s]", + unusedSegment.getIdentifier(), + unusedSegment.getVersion(), + myLock.getVersion() + ); + } + + log.info("OK to archive segment: %s", unusedSegment.getIdentifier()); + } + + List archivedSegments = Lists.newLinkedList(); + + // Move segments + for (DataSegment segment : unusedSegments) { + archivedSegments.add(toolbox.getDataSegmentArchiver().archive(segment)); + } + + // Update metadata for moved segments + toolbox.getTaskActionClient().submit( + new SegmentMoveAction( + ImmutableSet.copyOf(archivedSegments) + ) + ); + + return TaskStatus.success(getId()); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java index 3154d42075d..c8742f31c34 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java @@ -35,16 +35,20 @@ import io.druid.timeline.DataSegment; import org.joda.time.Interval; import java.util.List; +import java.util.Map; public class MoveTask extends AbstractFixedIntervalTask { private static final Logger log = new Logger(MoveTask.class); + private final Map targetLoadSpec; + @JsonCreator public MoveTask( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("target") Map targetLoadSpec ) { super( @@ -52,6 +56,7 @@ public class MoveTask extends AbstractFixedIntervalTask dataSource, interval ); + this.targetLoadSpec = targetLoadSpec; } @Override @@ -97,7 +102,7 @@ public class MoveTask extends AbstractFixedIntervalTask // Move segments for (DataSegment segment : unusedSegments) { - movedSegments.add(toolbox.getDataSegmentMover().move(segment)); + movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec)); } // Update metadata for moved segments diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 32747bdc7d3..8fa4b53bf10 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -46,6 +46,7 @@ import io.druid.query.QueryRunner; @JsonSubTypes.Type(name = "delete", value = DeleteTask.class), @JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "move", value = MoveTask.class), + @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index fcbfed7589b..a873daa876f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -64,6 +64,7 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPuller; @@ -89,6 +90,7 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; public class TaskLifecycleTest @@ -162,11 +164,19 @@ public class TaskLifecycleTest new DataSegmentMover() { @Override - public DataSegment move(DataSegment dataSegment) throws SegmentLoadingException + public DataSegment move(DataSegment dataSegment, Map targetLoadSpec) throws SegmentLoadingException { return dataSegment; } }, + new DataSegmentArchiver() + { + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + return segment; + } + }, null, // segment announcer null, // new segment server view null, // query runner factory conglomerate corporation unionized collective diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 7723013d239..8d4bf32b870 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -122,7 +122,7 @@ public class WorkerTaskMonitorTest new ThreadPoolTaskRunner( new TaskToolboxFactory( new TaskConfig(tmp.toString(), null, null, 0), - null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( + null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( new OmniSegmentLoader( ImmutableMap.of( "local", diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java index b339187ff29..b7c04ec0c00 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java @@ -46,13 +46,13 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg public DataSegment archive(DataSegment segment) throws SegmentLoadingException { String targetS3Bucket = config.getArchiveBucket(); - String targetS3Path = MapUtils.getString(segment.getLoadSpec(), "key"); + String targetS3BaseKey = config.getArchiveBaseKey(); return move( segment, ImmutableMap.of( "bucket", targetS3Bucket, - "key", targetS3Path + "baseKey", targetS3BaseKey ) ); } diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java index 53a04e43107..5eb33eb1b5d 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiverConfig.java @@ -26,8 +26,16 @@ public class S3DataSegmentArchiverConfig @JsonProperty public String archiveBucket = ""; + @JsonProperty + public String archiveBaseKey = ""; + public String getArchiveBucket() { return archiveBucket; } + + public String getArchiveBaseKey() + { + return archiveBaseKey; + } } diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index 9b3f122b590..01558b5c44d 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import com.metamx.common.MapUtils; import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import org.jets3t.service.ServiceException; @@ -56,14 +57,16 @@ public class S3DataSegmentMover implements DataSegmentMover String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket"); - final String targetS3Path = MapUtils.getString(targetLoadSpec, "key"); + final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey"); + + final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, segment); String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path); if (targetS3Bucket.isEmpty()) { throw new SegmentLoadingException("Target S3 bucket is not specified"); } if (targetS3Path.isEmpty()) { - throw new SegmentLoadingException("Target S3 path is not specified"); + throw new SegmentLoadingException("Target S3 baseKey is not specified"); } if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index a73ed4d42ac..664c270799b 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -20,7 +20,6 @@ package io.druid.storage.s3; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; @@ -29,7 +28,6 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; -import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.utils.CompressionUtils; import org.jets3t.service.ServiceException; @@ -45,7 +43,6 @@ import java.util.concurrent.Callable; public class S3DataSegmentPusher implements DataSegmentPusher { private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class); - private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final RestS3Service s3Client; private final S3DataSegmentPusherConfig config; @@ -73,10 +70,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { log.info("Uploading [%s] to S3", indexFilesDir); - final String outputKey = JOINER.join( - config.getBaseKey().isEmpty() ? null : config.getBaseKey(), - DataSegmentPusherUtil.getStorageDir(inSegment) - ); + final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), inSegment); final File zipOutFile = File.createTempFile("druid", "index.zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile); @@ -90,7 +84,6 @@ public class S3DataSegmentPusher implements DataSegmentPusher S3Object toPush = new S3Object(zipOutFile); final String outputBucket = config.getBucket(); - final String s3Path = outputKey + "/index.zip"; final String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); toPush.setBucketName(outputBucket); diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index a4764717c1d..6cf481fa2f9 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -19,9 +19,12 @@ package io.druid.storage.s3; +import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.metamx.common.RetryUtils; import org.jets3t.service.ServiceException; +import io.druid.segment.loading.DataSegmentPusherUtil; +import io.druid.timeline.DataSegment; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; @@ -34,6 +37,8 @@ import java.util.concurrent.Callable; */ public class S3Utils { + private static final Joiner JOINER = Joiner.on("/").skipNulls(); + public static void closeStreamsQuietly(S3Object s3Obj) { if (s3Obj == null) { @@ -96,6 +101,15 @@ public class S3Utils return true; } + + public static String constructSegmentPath(String baseKey, DataSegment segment) + { + return JOINER.join( + baseKey.isEmpty() ? null : baseKey, + DataSegmentPusherUtil.getStorageDir(segment) + ) + "/index.zip"; + } + public static String descriptorPathForSegmentPath(String s3Path) { return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; diff --git a/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java new file mode 100644 index 00000000000..bf34bbe17bb --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentArchiver.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import io.druid.timeline.DataSegment; + +import java.util.Map; + +public class OmniDataSegmentArchiver implements DataSegmentArchiver +{ + private final Map archivers; + + @Inject + public OmniDataSegmentArchiver( + Map archivers + ) + { + this.archivers = archivers; + } + + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + return getArchiver(segment).archive(segment); + } + + private DataSegmentArchiver getArchiver(DataSegment segment) throws SegmentLoadingException + { + String type = MapUtils.getString(segment.getLoadSpec(), "type"); + DataSegmentArchiver archiver = archivers.get(type); + + if (archiver == null) { + throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, archivers.keySet()); + } + + return archiver; + } +} diff --git a/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java index 490c936011d..d585b0b7db9 100644 --- a/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java +++ b/server/src/main/java/io/druid/segment/loading/OmniDataSegmentMover.java @@ -38,9 +38,9 @@ public class OmniDataSegmentMover implements DataSegmentMover } @Override - public DataSegment move(DataSegment segment) throws SegmentLoadingException + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException { - return getMover(segment).move(segment); + return getMover(segment).move(segment, targetLoadSpec); } private DataSegmentMover getMover(DataSegment segment) throws SegmentLoadingException diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 450c9286554..7204e5fc63a 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -60,8 +60,10 @@ import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.query.QuerySegmentWalker; +import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.OmniDataSegmentArchiver; import io.druid.segment.loading.OmniDataSegmentKiller; import io.druid.segment.loading.OmniDataSegmentMover; import io.druid.segment.loading.SegmentLoaderConfig; @@ -133,6 +135,8 @@ public class CliPeon extends GuiceRunnable binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class); Binders.dataSegmentMoverBinder(binder); binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentArchiverBinder(binder); + binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class); binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class); binder.bind(ExecutorLifecycleConfig.class).toInstance(