From 6864007c0567335890e1807afc1f5629a075f3f9 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Tue, 5 Mar 2013 13:27:25 -0600 Subject: [PATCH] 1) Remove Task parameter from other methods off of TaskToolbox that required it 2) Move getTaskDir() off of TaskConfig and onto TaskToolbox 3) Rename SegmentKiller interface to DataSegmentKiller 4) Change signature of "DataSegmentKiller.kill(Collection) throws ServiceException" to just kill(DataSegment) throws SegmentLoadingException 5) Add various log messages 6) Update the version of the segment that has been converted --- .../com/metamx/druid/client/DataSegment.java | 1 + .../druid/merger/common/TaskToolbox.java | 35 ++++++---- .../merger/common/TaskToolboxFactory.java | 31 +++----- .../merger/common/config/TaskConfig.java | 23 ++++-- .../merger/common/task/AbstractTask.java | 2 +- .../druid/merger/common/task/DeleteTask.java | 6 +- .../merger/common/task/HadoopIndexTask.java | 4 +- .../task/IndexDeterminePartitionsTask.java | 2 +- .../common/task/IndexGeneratorTask.java | 21 +++--- .../druid/merger/common/task/IndexTask.java | 2 +- .../druid/merger/common/task/KillTask.java | 10 +-- .../druid/merger/common/task/MergeTask.java | 37 +++------- .../common/task/VersionConverterSubTask.java | 21 +++++- .../common/task/VersionConverterTask.java | 63 ++++++++++++----- .../merger/coordinator/LocalTaskRunner.java | 2 +- .../http/IndexerCoordinatorNode.java | 8 +-- .../http/IndexerCoordinatorResource.java | 3 +- .../druid/merger/worker/TaskMonitor.java | 2 +- .../druid/merger/worker/http/WorkerNode.java | 12 ++-- .../merger/coordinator/TaskLifecycleTest.java | 40 +++++------ .../merger/coordinator/TaskQueueTest.java | 2 +- .../druid/loading/DataSegmentKiller.java | 29 ++++++++ .../druid/loading/S3DataSegmentKiller.java | 70 +++++++++++++++++++ .../druid/loading/S3DataSegmentPusher.java | 6 +- .../metamx/druid/loading/S3SegmentKiller.java | 49 ------------- .../metamx/druid/loading/SegmentKiller.java | 14 ---- 26 files changed, 281 insertions(+), 214 deletions(-) create mode 100644 server/src/main/java/com/metamx/druid/loading/DataSegmentKiller.java create mode 100644 server/src/main/java/com/metamx/druid/loading/S3DataSegmentKiller.java delete mode 100644 server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java delete mode 100644 server/src/main/java/com/metamx/druid/loading/SegmentKiller.java diff --git a/client/src/main/java/com/metamx/druid/client/DataSegment.java b/client/src/main/java/com/metamx/druid/client/DataSegment.java index b915f7680cd..e8b7e55a495 100644 --- a/client/src/main/java/com/metamx/druid/client/DataSegment.java +++ b/client/src/main/java/com/metamx/druid/client/DataSegment.java @@ -261,6 +261,7 @@ public class DataSegment implements Comparable ", loadSpec=" + loadSpec + ", interval=" + interval + ", dataSource='" + dataSource + '\'' + + ", binaryVersion='" + binaryVersion + '\'' + '}'; } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java index 58ad3c5cc43..32cb188273a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java @@ -25,11 +25,12 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.MMappedQueryableIndexFactory; import com.metamx.druid.loading.S3DataSegmentPuller; -import com.metamx.druid.loading.SegmentKiller; +import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.loading.SingleSegmentLoader; import com.metamx.druid.merger.common.actions.TaskActionClient; +import com.metamx.druid.merger.common.actions.TaskActionClientFactory; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.task.Task; import com.metamx.emitter.service.ServiceEmitter; @@ -45,29 +46,32 @@ import java.util.Map; public class TaskToolbox { private final TaskConfig config; - private final TaskActionClient taskActionClient; + private final Task task; + private final TaskActionClientFactory taskActionClientFactory; private final ServiceEmitter emitter; private final RestS3Service s3Client; private final DataSegmentPusher segmentPusher; - private final SegmentKiller segmentKiller; + private final DataSegmentKiller dataSegmentKiller; private final ObjectMapper objectMapper; public TaskToolbox( TaskConfig config, - TaskActionClient taskActionClient, + Task task, + TaskActionClientFactory taskActionClientFactory, ServiceEmitter emitter, RestS3Service s3Client, DataSegmentPusher segmentPusher, - SegmentKiller segmentKiller, + DataSegmentKiller dataSegmentKiller, ObjectMapper objectMapper ) { this.config = config; - this.taskActionClient = taskActionClient; + this.task = task; + this.taskActionClientFactory = taskActionClientFactory; this.emitter = emitter; this.s3Client = s3Client; this.segmentPusher = segmentPusher; - this.segmentKiller = segmentKiller; + this.dataSegmentKiller = dataSegmentKiller; this.objectMapper = objectMapper; } @@ -76,9 +80,9 @@ public class TaskToolbox return config; } - public TaskActionClient getTaskActionClient() + public TaskActionClient getTaskActionClientFactory() { - return taskActionClient; + return taskActionClientFactory.create(task); } public ServiceEmitter getEmitter() @@ -91,9 +95,9 @@ public class TaskToolbox return segmentPusher; } - public SegmentKiller getSegmentKiller() + public DataSegmentKiller getDataSegmentKiller() { - return segmentKiller; + return dataSegmentKiller; } public ObjectMapper getObjectMapper() @@ -101,7 +105,7 @@ public class TaskToolbox return objectMapper; } - public Map getSegments(final Task task, List segments) + public Map getSegments(List segments) throws SegmentLoadingException { final SingleSegmentLoader loader = new SingleSegmentLoader( @@ -112,7 +116,7 @@ public class TaskToolbox @Override public File getCacheDirectory() { - return new File(config.getTaskDir(task), "fetched_segments"); + return new File(getTaskDir(), "fetched_segments"); } } ); @@ -124,4 +128,9 @@ public class TaskToolbox return retVal; } + + public File getTaskDir() { + return new File(config.getBaseTaskDir(), task.getId()); + } + } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java index 9a804bef79e..2266860ea86 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java @@ -20,55 +20,43 @@ package com.metamx.druid.merger.common; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import com.metamx.druid.client.DataSegment; import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.loading.MMappedQueryableIndexFactory; -import com.metamx.druid.loading.S3DataSegmentPuller; -import com.metamx.druid.loading.SegmentKiller; -import com.metamx.druid.loading.SegmentLoaderConfig; -import com.metamx.druid.loading.SegmentLoadingException; -import com.metamx.druid.loading.SingleSegmentLoader; -import com.metamx.druid.merger.common.actions.TaskActionClient; +import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.merger.common.actions.TaskActionClientFactory; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.task.Task; import com.metamx.emitter.service.ServiceEmitter; import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import java.io.File; -import java.util.List; -import java.util.Map; - /** * Stuff that may be needed by a Task in order to conduct its business. */ public class TaskToolboxFactory { private final TaskConfig config; - private final TaskActionClientFactory taskActionClient; + private final TaskActionClientFactory taskActionClientFactory; private final ServiceEmitter emitter; private final RestS3Service s3Client; private final DataSegmentPusher segmentPusher; - private final SegmentKiller segmentKiller; + private final DataSegmentKiller dataSegmentKiller; private final ObjectMapper objectMapper; public TaskToolboxFactory( TaskConfig config, - TaskActionClientFactory taskActionClient, + TaskActionClientFactory taskActionClientFactory, ServiceEmitter emitter, RestS3Service s3Client, DataSegmentPusher segmentPusher, - SegmentKiller segmentKiller, + DataSegmentKiller dataSegmentKiller, ObjectMapper objectMapper ) { this.config = config; - this.taskActionClient = taskActionClient; + this.taskActionClientFactory = taskActionClientFactory; this.emitter = emitter; this.s3Client = s3Client; this.segmentPusher = segmentPusher; - this.segmentKiller = segmentKiller; + this.dataSegmentKiller = dataSegmentKiller; this.objectMapper = objectMapper; } @@ -81,11 +69,12 @@ public class TaskToolboxFactory { return new TaskToolbox( config, - taskActionClient == null ? null : taskActionClient.create(task), + task, + taskActionClientFactory, emitter, s3Client, segmentPusher, - segmentKiller, + dataSegmentKiller, objectMapper ); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java b/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java index 5918f0627c6..2bd27667514 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.common.config; import com.metamx.druid.merger.common.task.Task; @@ -17,8 +36,4 @@ public abstract class TaskConfig @Config("druid.merger.hadoopWorkingPath") public abstract String getHadoopWorkingPath(); - - public File getTaskDir(final Task task) { - return new File(getBaseTaskDir(), task.getId()); - } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java index c2f9a792bb0..518fb04ab37 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java @@ -102,7 +102,7 @@ public abstract class AbstractTask implements Task return ID_JOINER.join(objects); } - public SegmentListUsedAction makeImplicitListUsedAction() + public SegmentListUsedAction defaultListUsedAction() { return new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java index 2f10861ef5f..67754ee00d7 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java @@ -77,7 +77,7 @@ public class DeleteTask extends AbstractTask public TaskStatus run(TaskToolbox toolbox) throws Exception { // Strategy: Create an empty segment covering the interval to be deleted - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final Interval interval = this.getImplicitLockInterval().get(); final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty); @@ -91,7 +91,7 @@ public class DeleteTask extends AbstractTask .shardSpec(new NoneShardSpec()) .build(); - final File outDir = new File(toolbox.getConfig().getTaskDir(this), segment.getIdentifier()); + final File outDir = new File(toolbox.getTaskDir(), segment.getIdentifier()); final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir); // Upload the segment @@ -104,7 +104,7 @@ public class DeleteTask extends AbstractTask segment.getVersion() ); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java index 6e284557529..f0d09d5137d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java @@ -93,7 +93,7 @@ public class HadoopIndexTask extends AbstractTask ); // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); log.info("Setting version to: %s", myLock.getVersion()); configCopy.setVersion(myLock.getVersion()); @@ -124,7 +124,7 @@ public class HadoopIndexTask extends AbstractTask List publishedSegments = job.getPublishedSegments(); // Request segment pushes - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments))); + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments))); // Done return TaskStatus.success(getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java index 47f72b12501..3dfe99a68f1 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java @@ -258,7 +258,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask } ); - toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks)); + toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks)); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java index 2a7b4683b8d..1790ddc6aa5 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java @@ -100,24 +100,21 @@ public class IndexGeneratorTask extends AbstractTask public TaskStatus run(final TaskToolbox toolbox) throws Exception { // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); // We know this exists final Interval interval = getImplicitLockInterval().get(); // Set up temporary directory for indexing final File tmpDir = new File( + toolbox.getTaskDir(), String.format( - "%s/%s", - toolbox.getConfig().getTaskDir(this).toString(), - String.format( - "%s_%s_%s_%s_%s", - this.getDataSource(), - interval.getStart(), - interval.getEnd(), - myLock.getVersion(), - schema.getShardSpec().getPartitionNum() - ) + "%s_%s_%s_%s_%s", + this.getDataSource(), + interval.getStart(), + interval.getEnd(), + myLock.getVersion(), + schema.getShardSpec().getPartitionNum() ) ); @@ -193,7 +190,7 @@ public class IndexGeneratorTask extends AbstractTask ); // Request segment pushes - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments))); + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments))); // Done return TaskStatus.success(getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java index 35babcd6a22..d1bfc6d77fc 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java @@ -142,7 +142,7 @@ public class IndexTask extends AbstractTask @Override public TaskStatus preflight(TaskToolbox toolbox) throws Exception { - toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks())); + toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(toSubtasks())); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java index 52b8b48b8b4..e652ab69151 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -72,7 +72,7 @@ public class KillTask extends AbstractTask public TaskStatus run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); if(!myLock.getDataSource().equals(getDataSource())) { throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); @@ -84,7 +84,7 @@ public class KillTask extends AbstractTask // List unused segments final List unusedSegments = toolbox - .getTaskActionClient() + .getTaskActionClientFactory() .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); // Verify none of these segments have versions > lock version @@ -102,10 +102,12 @@ public class KillTask extends AbstractTask } // Kill segments - toolbox.getSegmentKiller().kill(unusedSegments); + for (DataSegment segment : unusedSegments) { + toolbox.getDataSegmentKiller().kill(segment); + } // Remove metadata for these segments - toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments))); + toolbox.getTaskActionClientFactory().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments))); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java index 757a91c2598..561cb940639 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java @@ -43,8 +43,8 @@ import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.actions.LockListAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction; -import com.metamx.druid.merger.common.actions.SegmentListUsedAction; import com.metamx.druid.shard.NoneShardSpec; +import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.AlertEvent; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -67,7 +67,7 @@ public abstract class MergeTask extends AbstractTask { private final List segments; - private static final Logger log = new Logger(MergeTask.class); + private static final EmittingLogger log = new EmittingLogger(MergeTask.class); protected MergeTask(final String dataSource, final List segments) { @@ -119,11 +119,11 @@ public abstract class MergeTask extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final ServiceEmitter emitter = toolbox.getEmitter(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments); - final File taskDir = toolbox.getConfig().getTaskDir(this); + final File taskDir = toolbox.getTaskDir(); try { @@ -147,7 +147,7 @@ public abstract class MergeTask extends AbstractTask // download segments to merge - final Map gettedSegments = toolbox.getSegments(this, segments); + final Map gettedSegments = toolbox.getSegments(segments); // merge files together final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged")); @@ -170,27 +170,14 @@ public abstract class MergeTask extends AbstractTask emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); return TaskStatus.success(getId()); } catch (Exception e) { - log.error( - e, - String.format( - "Exception merging %s[%s] segments", - mergedSegment.getDataSource(), - mergedSegment.getInterval() - ) - ); - emitter.emit( - new AlertEvent.Builder().build( - "Exception merging", - ImmutableMap.builder() - .put("exception", e.toString()) - .build() - ) - ); + log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource()) + .addData("interval", mergedSegment.getInterval()) + .emit(); return TaskStatus.failure(getId()); } @@ -213,11 +200,7 @@ public abstract class MergeTask extends AbstractTask }; final Set current = ImmutableSet.copyOf( - Iterables.transform( - toolbox.getTaskActionClient() - .submit(new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get())), - toIdentifier - ) + Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier) ); final Set requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier)); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java index db08e788e4d..2099d903d33 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java @@ -21,11 +21,13 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Sets; +import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.actions.SegmentInsertAction; +import org.joda.time.DateTime; import java.io.File; import java.util.Arrays; @@ -35,6 +37,8 @@ import java.util.Map; */ public class VersionConverterSubTask extends AbstractTask { + private static final Logger log = new Logger(VersionConverterSubTask.class); + private final DataSegment segment; protected VersionConverterSubTask( @@ -50,6 +54,7 @@ public class VersionConverterSubTask extends AbstractTask segment.getInterval().getEnd(), segment.getShardSpec().getPartitionNum() ), + groupId, segment.getDataSource(), segment.getInterval() ); @@ -65,13 +70,23 @@ public class VersionConverterSubTask extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final Map localSegments = toolbox.getSegments(this, Arrays.asList(segment)); + log.info("Converting segment[%s]", segment); + final Map localSegments = toolbox.getSegments(Arrays.asList(segment)); final File location = localSegments.get(segment); final File outLocation = new File(location, "v9_out"); if (IndexIO.convertSegment(location, outLocation)) { - final DataSegment updatedSegment = toolbox.getSegmentPusher().push(outLocation, segment); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); + final int outVersion = IndexIO.getVersionFromDir(outLocation); + + // Appending to the version makes a new version that inherits most comparability parameters of the original + // version, but is "newer" than said original version. + DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion)); + updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment); + + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); + } + else { + log.info("Conversion failed."); } return success(); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java index 4106e210547..3859190a1bb 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -1,10 +1,32 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.collect.Lists; import com.metamx.common.ISE; +import com.metamx.common.guava.FunctionalIterable; +import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.actions.SpawnTasksAction; @@ -12,7 +34,6 @@ import com.metamx.druid.merger.common.actions.TaskActionClient; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.List; /** @@ -20,6 +41,9 @@ import java.util.List; public class VersionConverterTask extends AbstractTask { private static final String TYPE = "version_converter"; + private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID); + + private static final Logger log = new Logger(VersionConverterTask.class); public VersionConverterTask( @JsonProperty("dataSource") String dataSource, @@ -48,25 +72,30 @@ public class VersionConverterTask extends AbstractTask @Override public TaskStatus preflight(TaskToolbox toolbox) throws Exception { - final TaskActionClient taskClient = toolbox.getTaskActionClient(); + final TaskActionClient taskClient = toolbox.getTaskActionClientFactory(); - List segments = taskClient.submit(makeImplicitListUsedAction()); + List segments = taskClient.submit(defaultListUsedAction()); - taskClient.submit( - new SpawnTasksAction( - Lists.transform( - segments, - new Function() - { - @Override - public Task apply(@Nullable DataSegment input) - { - return new VersionConverterSubTask(getGroupId(), input); - } + final FunctionalIterable tasks = FunctionalIterable + .create(segments) + .keep( + new Function() + { + @Override + public Task apply(DataSegment segment) + { + final Integer segmentVersion = segment.getBinaryVersion(); + if (!CURR_VERSION_INTEGER.equals(segmentVersion)) { + return new VersionConverterSubTask(getGroupId(), segment); } - ) - ) - ); + + log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion); + return null; + } + } + ); + + taskClient.submit(new SpawnTasksAction(Lists.newArrayList(tasks))); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java index a4f58a361f8..5dbe8273d6c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java @@ -92,7 +92,7 @@ public class LocalTaskRunner implements TaskRunner } try { - final File taskDir = toolbox.getConfig().getTaskDir(task); + final File taskDir = toolbox.getTaskDir(); if (taskDir.exists()) { log.info("Removing task directory: %s", taskDir); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 833c3976ee8..286d2e1649a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -52,8 +52,8 @@ import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.loading.S3SegmentKiller; -import com.metamx.druid.loading.SegmentKiller; +import com.metamx.druid.loading.S3DataSegmentKiller; +import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.merger.common.actions.TaskActionToolbox; @@ -489,7 +489,7 @@ public class IndexerCoordinatorNode extends RegisteringNode public void initializeTaskToolbox() { if (taskToolboxFactory == null) { - final SegmentKiller segmentKiller = new S3SegmentKiller(s3Service); + final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service); taskToolboxFactory = new TaskToolboxFactory( taskConfig, new LocalTaskActionClientFactory( @@ -499,7 +499,7 @@ public class IndexerCoordinatorNode extends RegisteringNode emitter, s3Service, segmentPusher, - segmentKiller, + dataSegmentKiller, jsonMapper ); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index 3ac16741028..5f0714a8d61 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -27,7 +27,6 @@ import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.merger.common.TaskStatus; -import com.metamx.druid.merger.common.actions.TaskAction; import com.metamx.druid.merger.common.actions.TaskActionHolder; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; @@ -185,7 +184,7 @@ public class IndexerCoordinatorResource public Response doAction(final TaskActionHolder holder) { final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask()) - .getTaskActionClient() + .getTaskActionClientFactory() .submit(holder.getAction()); final Map retMap = Maps.newHashMap(); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java index ec3a8d992e0..867b8dd9cde 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java @@ -101,7 +101,7 @@ public class TaskMonitor public void run() { final long startTime = System.currentTimeMillis(); - final File taskDir = toolbox.getConfig().getTaskDir(task); + final File taskDir = toolbox.getTaskDir(); log.info("Running task [%s]", task.getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index bad04040e73..caef5bd2935 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -39,13 +39,9 @@ import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.loading.S3DataSegmentPusher; -import com.metamx.druid.loading.S3DataSegmentPusherConfig; -import com.metamx.druid.loading.S3SegmentKiller; -import com.metamx.druid.loading.SegmentKiller; -import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.loading.S3DataSegmentKiller; +import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.merger.common.TaskToolboxFactory; -import com.metamx.druid.merger.common.actions.RemoteTaskActionClient; import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.TaskConfig; @@ -355,14 +351,14 @@ public class WorkerNode extends RegisteringNode public void initializeTaskToolbox() throws S3ServiceException { if (taskToolboxFactory == null) { - final SegmentKiller segmentKiller = new S3SegmentKiller(s3Service); + final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service); taskToolboxFactory = new TaskToolboxFactory( taskConfig, new RemoteTaskActionClientFactory(httpClient, coordinatorServiceProvider, jsonMapper), emitter, s3Service, segmentPusher, - segmentKiller, + dataSegmentKiller, jsonMapper ); } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index 3dc889b96f0..4fac2c5eded 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -40,7 +40,8 @@ import com.metamx.druid.input.InputRow; import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.loading.SegmentKiller; +import com.metamx.druid.loading.DataSegmentKiller; +import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; @@ -75,7 +76,6 @@ import org.junit.Test; import java.io.File; import java.io.IOException; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -146,10 +146,10 @@ public class TaskLifecycleTest return segment; } }, - new SegmentKiller() + new DataSegmentKiller() { @Override - public void kill(Collection segments) throws ServiceException + public void kill(DataSegment segments) throws SegmentLoadingException { } @@ -283,8 +283,8 @@ public class TaskLifecycleTest // Sort of similar to what realtime tasks do: // Acquire lock for first interval - final Optional lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1)); - final List locks1 = toolbox.getTaskActionClient().submit(new LockListAction()); + final Optional lock1 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval1)); + final List locks1 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertTrue("lock1 present", lock1.isPresent()); @@ -292,8 +292,8 @@ public class TaskLifecycleTest Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1); // Acquire lock for second interval - final Optional lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2)); - final List locks2 = toolbox.getTaskActionClient().submit(new LockListAction()); + final Optional lock2 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval2)); + final List locks2 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertTrue("lock2 present", lock2.isPresent()); @@ -301,7 +301,7 @@ public class TaskLifecycleTest Assert.assertEquals("locks2", ImmutableList.of(lock1.get(), lock2.get()), locks2); // Push first segment - toolbox.getTaskActionClient() + toolbox.getTaskActionClientFactory() .submit( new SegmentInsertAction( ImmutableSet.of( @@ -315,14 +315,14 @@ public class TaskLifecycleTest ); // Release first lock - toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1)); - final List locks3 = toolbox.getTaskActionClient().submit(new LockListAction()); + toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval1)); + final List locks3 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3); // Push second segment - toolbox.getTaskActionClient() + toolbox.getTaskActionClientFactory() .submit( new SegmentInsertAction( ImmutableSet.of( @@ -336,8 +336,8 @@ public class TaskLifecycleTest ); // Release second lock - toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2)); - final List locks4 = toolbox.getTaskActionClient().submit(new LockListAction()); + toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval2)); + final List locks4 = toolbox.getTaskActionClientFactory().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertEquals("locks4", ImmutableList.of(), locks4); @@ -370,7 +370,7 @@ public class TaskLifecycleTest public TaskStatus run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement( - toolbox.getTaskActionClient() + toolbox.getTaskActionClientFactory() .submit(new LockListAction()) ); @@ -380,7 +380,7 @@ public class TaskLifecycleTest .version(myLock.getVersion()) .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; @@ -406,7 +406,7 @@ public class TaskLifecycleTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final DataSegment segment = DataSegment.builder() .dataSource("ds") @@ -414,7 +414,7 @@ public class TaskLifecycleTest .version(myLock.getVersion()) .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; @@ -440,7 +440,7 @@ public class TaskLifecycleTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); final DataSegment segment = DataSegment.builder() .dataSource("ds") @@ -448,7 +448,7 @@ public class TaskLifecycleTest .version(myLock.getVersion() + "1!!!1!!") .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java index 939dc9b6b21..dfb0d959d1a 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java @@ -375,7 +375,7 @@ public class TaskQueueTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks)); + toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks)); return TaskStatus.success(id); } }; diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentKiller.java new file mode 100644 index 00000000000..85483eaa505 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentKiller.java @@ -0,0 +1,29 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading; + +import com.metamx.druid.client.DataSegment; + +/** + */ +public interface DataSegmentKiller +{ + public void kill(DataSegment segments) throws SegmentLoadingException; +} diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentKiller.java new file mode 100644 index 00000000000..47c39c1857e --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentKiller.java @@ -0,0 +1,70 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; + +import java.util.Map; + +/** + */ +public class S3DataSegmentKiller implements DataSegmentKiller +{ + private static final Logger log = new Logger(S3DataSegmentKiller.class); + + private final RestS3Service s3Client; + + @Inject + public S3DataSegmentKiller( + RestS3Service s3Client + ) + { + this.s3Client = s3Client; + } + + + @Override + public void kill(DataSegment segment) throws SegmentLoadingException + { + try { + Map loadSpec = segment.getLoadSpec(); + String s3Bucket = MapUtils.getString(loadSpec, "bucket"); + String s3Path = MapUtils.getString(loadSpec, "key"); + String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + + if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); + s3Client.deleteObject(s3Bucket, s3Path); + } + if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { + log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); + s3Client.deleteObject(s3Bucket, s3DescriptorPath); + } + } + catch (ServiceException e) { + throw new SegmentLoadingException(e, "Couldn't kill segment[%s]", segment.getIdentifier()); + } + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java index dd5ce951695..9dacbe8b546 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java @@ -87,11 +87,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher segment = segment.withSize(indexSize) .withLoadSpec( - ImmutableMap.of( - "type", "s3_zip", - "bucket", outputBucket, - "key", toPush.getKey() - ) + ImmutableMap.of("type", "s3_zip", "bucket", outputBucket, "key", toPush.getKey()) ) .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir)); diff --git a/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java deleted file mode 100644 index 46f6acfc629..00000000000 --- a/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.metamx.druid.loading; - -import com.google.inject.Inject; -import com.metamx.common.MapUtils; -import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DataSegment; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; - -import java.util.Collection; -import java.util.Map; - -/** - */ -public class S3SegmentKiller implements SegmentKiller -{ - private static final Logger log = new Logger(S3SegmentKiller.class); - - private final RestS3Service s3Client; - - @Inject - public S3SegmentKiller( - RestS3Service s3Client - ) - { - this.s3Client = s3Client; - } - - - @Override - public void kill(Collection segments) throws ServiceException - { - for (final DataSegment segment : segments) { - Map loadSpec = segment.getLoadSpec(); - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); - String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; - - if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { - log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); - s3Client.deleteObject(s3Bucket, s3Path); - } - if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { - log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); - s3Client.deleteObject(s3Bucket, s3DescriptorPath); - } - } - } -} diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java deleted file mode 100644 index 8f8746d5324..00000000000 --- a/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.metamx.druid.loading; - -import com.metamx.druid.client.DataSegment; -import org.jets3t.service.ServiceException; - -import java.util.Collection; -import java.util.List; - -/** - */ -public interface SegmentKiller -{ - public void kill(Collection segments) throws ServiceException; -}