1) Remove Task parameter from other methods off of TaskToolbox that required it

2) Move getTaskDir() off of TaskConfig and onto TaskToolbox
3) Rename SegmentKiller interface to DataSegmentKiller
4) Change signature of "DataSegmentKiller.kill(Collection<DataSegment>) throws ServiceException" to just kill(DataSegment) throws SegmentLoadingException
5) Add various log messages
6) Update the version of the segment that has been converted
This commit is contained in:
Eric Tschetter 2013-03-05 13:27:25 -06:00
parent c9b411c0ca
commit 6864007c05
26 changed files with 281 additions and 214 deletions

View File

@ -261,6 +261,7 @@ public class DataSegment implements Comparable<DataSegment>
", loadSpec=" + loadSpec +
", interval=" + interval +
", dataSource='" + dataSource + '\'' +
", binaryVersion='" + binaryVersion + '\'' +
'}';
}

View File

@ -25,11 +25,12 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.emitter.service.ServiceEmitter;
@ -45,29 +46,32 @@ import java.util.Map;
public class TaskToolbox
{
private final TaskConfig config;
private final TaskActionClient taskActionClient;
private final Task task;
private final TaskActionClientFactory taskActionClientFactory;
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
private final DataSegmentPusher segmentPusher;
private final SegmentKiller segmentKiller;
private final DataSegmentKiller dataSegmentKiller;
private final ObjectMapper objectMapper;
public TaskToolbox(
TaskConfig config,
TaskActionClient taskActionClient,
Task task,
TaskActionClientFactory taskActionClientFactory,
ServiceEmitter emitter,
RestS3Service s3Client,
DataSegmentPusher segmentPusher,
SegmentKiller segmentKiller,
DataSegmentKiller dataSegmentKiller,
ObjectMapper objectMapper
)
{
this.config = config;
this.taskActionClient = taskActionClient;
this.task = task;
this.taskActionClientFactory = taskActionClientFactory;
this.emitter = emitter;
this.s3Client = s3Client;
this.segmentPusher = segmentPusher;
this.segmentKiller = segmentKiller;
this.dataSegmentKiller = dataSegmentKiller;
this.objectMapper = objectMapper;
}
@ -76,9 +80,9 @@ public class TaskToolbox
return config;
}
public TaskActionClient getTaskActionClient()
public TaskActionClient getTaskActionClientFactory()
{
return taskActionClient;
return taskActionClientFactory.create(task);
}
public ServiceEmitter getEmitter()
@ -91,9 +95,9 @@ public class TaskToolbox
return segmentPusher;
}
public SegmentKiller getSegmentKiller()
public DataSegmentKiller getDataSegmentKiller()
{
return segmentKiller;
return dataSegmentKiller;
}
public ObjectMapper getObjectMapper()
@ -101,7 +105,7 @@ public class TaskToolbox
return objectMapper;
}
public Map<DataSegment, File> getSegments(final Task task, List<DataSegment> segments)
public Map<DataSegment, File> getSegments(List<DataSegment> segments)
throws SegmentLoadingException
{
final SingleSegmentLoader loader = new SingleSegmentLoader(
@ -112,7 +116,7 @@ public class TaskToolbox
@Override
public File getCacheDirectory()
{
return new File(config.getTaskDir(task), "fetched_segments");
return new File(getTaskDir(), "fetched_segments");
}
}
);
@ -124,4 +128,9 @@ public class TaskToolbox
return retVal;
}
public File getTaskDir() {
return new File(config.getBaseTaskDir(), task.getId());
}
}

View File

@ -20,55 +20,43 @@
package com.metamx.druid.merger.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.File;
import java.util.List;
import java.util.Map;
/**
* Stuff that may be needed by a Task in order to conduct its business.
*/
public class TaskToolboxFactory
{
private final TaskConfig config;
private final TaskActionClientFactory taskActionClient;
private final TaskActionClientFactory taskActionClientFactory;
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
private final DataSegmentPusher segmentPusher;
private final SegmentKiller segmentKiller;
private final DataSegmentKiller dataSegmentKiller;
private final ObjectMapper objectMapper;
public TaskToolboxFactory(
TaskConfig config,
TaskActionClientFactory taskActionClient,
TaskActionClientFactory taskActionClientFactory,
ServiceEmitter emitter,
RestS3Service s3Client,
DataSegmentPusher segmentPusher,
SegmentKiller segmentKiller,
DataSegmentKiller dataSegmentKiller,
ObjectMapper objectMapper
)
{
this.config = config;
this.taskActionClient = taskActionClient;
this.taskActionClientFactory = taskActionClientFactory;
this.emitter = emitter;
this.s3Client = s3Client;
this.segmentPusher = segmentPusher;
this.segmentKiller = segmentKiller;
this.dataSegmentKiller = dataSegmentKiller;
this.objectMapper = objectMapper;
}
@ -81,11 +69,12 @@ public class TaskToolboxFactory
{
return new TaskToolbox(
config,
taskActionClient == null ? null : taskActionClient.create(task),
task,
taskActionClientFactory,
emitter,
s3Client,
segmentPusher,
segmentKiller,
dataSegmentKiller,
objectMapper
);
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.config;
import com.metamx.druid.merger.common.task.Task;
@ -17,8 +36,4 @@ public abstract class TaskConfig
@Config("druid.merger.hadoopWorkingPath")
public abstract String getHadoopWorkingPath();
public File getTaskDir(final Task task) {
return new File(getBaseTaskDir(), task.getId());
}
}

View File

@ -102,7 +102,7 @@ public abstract class AbstractTask implements Task
return ID_JOINER.join(objects);
}
public SegmentListUsedAction makeImplicitListUsedAction()
public SegmentListUsedAction defaultListUsedAction()
{
return new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get());
}

View File

@ -77,7 +77,7 @@ public class DeleteTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
final Interval interval = this.getImplicitLockInterval().get();
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty);
@ -91,7 +91,7 @@ public class DeleteTask extends AbstractTask
.shardSpec(new NoneShardSpec())
.build();
final File outDir = new File(toolbox.getConfig().getTaskDir(this), segment.getIdentifier());
final File outDir = new File(toolbox.getTaskDir(), segment.getIdentifier());
final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
// Upload the segment
@ -104,7 +104,7 @@ public class DeleteTask extends AbstractTask
segment.getVersion()
);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId());
}

View File

@ -93,7 +93,7 @@ public class HadoopIndexTask extends AbstractTask
);
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
log.info("Setting version to: %s", myLock.getVersion());
configCopy.setVersion(myLock.getVersion());
@ -124,7 +124,7 @@ public class HadoopIndexTask extends AbstractTask
List<DataSegment> publishedSegments = job.getPublishedSegments();
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
// Done
return TaskStatus.success(getId());

View File

@ -258,7 +258,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
}
);
toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(getId());
}

View File

@ -100,24 +100,21 @@ public class IndexGeneratorTask extends AbstractTask
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
// We know this exists
final Interval interval = getImplicitLockInterval().get();
// Set up temporary directory for indexing
final File tmpDir = new File(
toolbox.getTaskDir(),
String.format(
"%s/%s",
toolbox.getConfig().getTaskDir(this).toString(),
String.format(
"%s_%s_%s_%s_%s",
this.getDataSource(),
interval.getStart(),
interval.getEnd(),
myLock.getVersion(),
schema.getShardSpec().getPartitionNum()
)
"%s_%s_%s_%s_%s",
this.getDataSource(),
interval.getStart(),
interval.getEnd(),
myLock.getVersion(),
schema.getShardSpec().getPartitionNum()
)
);
@ -193,7 +190,7 @@ public class IndexGeneratorTask extends AbstractTask
);
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
// Done
return TaskStatus.success(getId());

View File

@ -142,7 +142,7 @@ public class IndexTask extends AbstractTask
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{
toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks()));
toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(toSubtasks()));
return TaskStatus.success(getId());
}

View File

@ -72,7 +72,7 @@ public class KillTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
if(!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
@ -84,7 +84,7 @@ public class KillTask extends AbstractTask
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.getTaskActionClientFactory()
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
@ -102,10 +102,12 @@ public class KillTask extends AbstractTask
}
// Kill segments
toolbox.getSegmentKiller().kill(unusedSegments);
for (DataSegment segment : unusedSegments) {
toolbox.getDataSegmentKiller().kill(segment);
}
// Remove metadata for these segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
toolbox.getTaskActionClientFactory().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
return TaskStatus.success(getId());
}

View File

@ -43,8 +43,8 @@ import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.actions.SegmentListUsedAction;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
@ -67,7 +67,7 @@ public abstract class MergeTask extends AbstractTask
{
private final List<DataSegment> segments;
private static final Logger log = new Logger(MergeTask.class);
private static final EmittingLogger log = new EmittingLogger(MergeTask.class);
protected MergeTask(final String dataSource, final List<DataSegment> segments)
{
@ -119,11 +119,11 @@ public abstract class MergeTask extends AbstractTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
final File taskDir = toolbox.getConfig().getTaskDir(this);
final File taskDir = toolbox.getTaskDir();
try {
@ -147,7 +147,7 @@ public abstract class MergeTask extends AbstractTask
// download segments to merge
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(this, segments);
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments);
// merge files together
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
@ -170,27 +170,14 @@ public abstract class MergeTask extends AbstractTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId());
}
catch (Exception e) {
log.error(
e,
String.format(
"Exception merging %s[%s] segments",
mergedSegment.getDataSource(),
mergedSegment.getInterval()
)
);
emitter.emit(
new AlertEvent.Builder().build(
"Exception merging",
ImmutableMap.<String, Object>builder()
.put("exception", e.toString())
.build()
)
);
log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource())
.addData("interval", mergedSegment.getInterval())
.emit();
return TaskStatus.failure(getId());
}
@ -213,11 +200,7 @@ public abstract class MergeTask extends AbstractTask
};
final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(
toolbox.getTaskActionClient()
.submit(new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get())),
toIdentifier
)
Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier)
);
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));

View File

@ -21,11 +21,13 @@ package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import org.joda.time.DateTime;
import java.io.File;
import java.util.Arrays;
@ -35,6 +37,8 @@ import java.util.Map;
*/
public class VersionConverterSubTask extends AbstractTask
{
private static final Logger log = new Logger(VersionConverterSubTask.class);
private final DataSegment segment;
protected VersionConverterSubTask(
@ -50,6 +54,7 @@ public class VersionConverterSubTask extends AbstractTask
segment.getInterval().getEnd(),
segment.getShardSpec().getPartitionNum()
),
groupId,
segment.getDataSource(),
segment.getInterval()
);
@ -65,13 +70,23 @@ public class VersionConverterSubTask extends AbstractTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Map<DataSegment, File> localSegments = toolbox.getSegments(this, Arrays.asList(segment));
log.info("Converting segment[%s]", segment);
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment));
final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out");
if (IndexIO.convertSegment(location, outLocation)) {
final DataSegment updatedSegment = toolbox.getSegmentPusher().push(outLocation, segment);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
final int outVersion = IndexIO.getVersionFromDir(outLocation);
// Appending to the version makes a new version that inherits most comparability parameters of the original
// version, but is "newer" than said original version.
DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
}
else {
log.info("Conversion failed.");
}
return success();

View File

@ -1,10 +1,32 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
@ -12,7 +34,6 @@ import com.metamx.druid.merger.common.actions.TaskActionClient;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
/**
@ -20,6 +41,9 @@ import java.util.List;
public class VersionConverterTask extends AbstractTask
{
private static final String TYPE = "version_converter";
private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID);
private static final Logger log = new Logger(VersionConverterTask.class);
public VersionConverterTask(
@JsonProperty("dataSource") String dataSource,
@ -48,25 +72,30 @@ public class VersionConverterTask extends AbstractTask
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{
final TaskActionClient taskClient = toolbox.getTaskActionClient();
final TaskActionClient taskClient = toolbox.getTaskActionClientFactory();
List<DataSegment> segments = taskClient.submit(makeImplicitListUsedAction());
List<DataSegment> segments = taskClient.submit(defaultListUsedAction());
taskClient.submit(
new SpawnTasksAction(
Lists.transform(
segments,
new Function<DataSegment, Task>()
{
@Override
public Task apply(@Nullable DataSegment input)
{
return new VersionConverterSubTask(getGroupId(), input);
}
final FunctionalIterable<Task> tasks = FunctionalIterable
.create(segments)
.keep(
new Function<DataSegment, Task>()
{
@Override
public Task apply(DataSegment segment)
{
final Integer segmentVersion = segment.getBinaryVersion();
if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
return new VersionConverterSubTask(getGroupId(), segment);
}
)
)
);
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
return null;
}
}
);
taskClient.submit(new SpawnTasksAction(Lists.newArrayList(tasks)));
return TaskStatus.success(getId());
}

View File

@ -92,7 +92,7 @@ public class LocalTaskRunner implements TaskRunner
}
try {
final File taskDir = toolbox.getConfig().getTaskDir(task);
final File taskDir = toolbox.getTaskDir();
if (taskDir.exists()) {
log.info("Removing task directory: %s", taskDir);

View File

@ -52,8 +52,8 @@ import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
@ -489,7 +489,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
public void initializeTaskToolbox()
{
if (taskToolboxFactory == null) {
final SegmentKiller segmentKiller = new S3SegmentKiller(s3Service);
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new LocalTaskActionClientFactory(
@ -499,7 +499,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
emitter,
s3Service,
segmentPusher,
segmentKiller,
dataSegmentKiller,
jsonMapper
);
}

View File

@ -27,7 +27,6 @@ import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskAction;
import com.metamx.druid.merger.common.actions.TaskActionHolder;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
@ -185,7 +184,7 @@ public class IndexerCoordinatorResource
public <T> Response doAction(final TaskActionHolder<T> holder)
{
final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask())
.getTaskActionClient()
.getTaskActionClientFactory()
.submit(holder.getAction());
final Map<String, Object> retMap = Maps.newHashMap();

View File

@ -101,7 +101,7 @@ public class TaskMonitor
public void run()
{
final long startTime = System.currentTimeMillis();
final File taskDir = toolbox.getConfig().getTaskDir(task);
final File taskDir = toolbox.getTaskDir();
log.info("Running task [%s]", task.getId());

View File

@ -39,13 +39,9 @@ import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClient;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
@ -355,14 +351,14 @@ public class WorkerNode extends RegisteringNode
public void initializeTaskToolbox() throws S3ServiceException
{
if (taskToolboxFactory == null) {
final SegmentKiller segmentKiller = new S3SegmentKiller(s3Service);
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new RemoteTaskActionClientFactory(httpClient, coordinatorServiceProvider, jsonMapper),
emitter,
s3Service,
segmentPusher,
segmentKiller,
dataSegmentKiller,
jsonMapper
);
}

View File

@ -40,7 +40,8 @@ import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
@ -75,7 +76,6 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@ -146,10 +146,10 @@ public class TaskLifecycleTest
return segment;
}
},
new SegmentKiller()
new DataSegmentKiller()
{
@Override
public void kill(Collection<DataSegment> segments) throws ServiceException
public void kill(DataSegment segments) throws SegmentLoadingException
{
}
@ -283,8 +283,8 @@ public class TaskLifecycleTest
// Sort of similar to what realtime tasks do:
// Acquire lock for first interval
final Optional<TaskLock> lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1));
final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
final Optional<TaskLock> lock1 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval1));
final List<TaskLock> locks1 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertTrue("lock1 present", lock1.isPresent());
@ -292,8 +292,8 @@ public class TaskLifecycleTest
Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1);
// Acquire lock for second interval
final Optional<TaskLock> lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2));
final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
final Optional<TaskLock> lock2 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval2));
final List<TaskLock> locks2 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertTrue("lock2 present", lock2.isPresent());
@ -301,7 +301,7 @@ public class TaskLifecycleTest
Assert.assertEquals("locks2", ImmutableList.of(lock1.get(), lock2.get()), locks2);
// Push first segment
toolbox.getTaskActionClient()
toolbox.getTaskActionClientFactory()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
@ -315,14 +315,14 @@ public class TaskLifecycleTest
);
// Release first lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
final List<TaskLock> locks3 = toolbox.getTaskActionClient().submit(new LockListAction());
toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval1));
final List<TaskLock> locks3 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3);
// Push second segment
toolbox.getTaskActionClient()
toolbox.getTaskActionClientFactory()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
@ -336,8 +336,8 @@ public class TaskLifecycleTest
);
// Release second lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
final List<TaskLock> locks4 = toolbox.getTaskActionClient().submit(new LockListAction());
toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval2));
final List<TaskLock> locks4 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks4", ImmutableList.<TaskLock>of(), locks4);
@ -370,7 +370,7 @@ public class TaskLifecycleTest
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(
toolbox.getTaskActionClient()
toolbox.getTaskActionClientFactory()
.submit(new LockListAction())
);
@ -380,7 +380,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion())
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};
@ -406,7 +406,7 @@ public class TaskLifecycleTest
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
final DataSegment segment = DataSegment.builder()
.dataSource("ds")
@ -414,7 +414,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion())
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};
@ -440,7 +440,7 @@ public class TaskLifecycleTest
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
final DataSegment segment = DataSegment.builder()
.dataSource("ds")
@ -448,7 +448,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion() + "1!!!1!!")
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};

View File

@ -375,7 +375,7 @@ public class TaskQueueTest
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(id);
}
};

View File

@ -0,0 +1,29 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.metamx.druid.client.DataSegment;
/**
*/
public interface DataSegmentKiller
{
public void kill(DataSegment segments) throws SegmentLoadingException;
}

View File

@ -0,0 +1,70 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.util.Map;
/**
*/
public class S3DataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(S3DataSegmentKiller.class);
private final RestS3Service s3Client;
@Inject
public S3DataSegmentKiller(
RestS3Service s3Client
)
{
this.s3Client = s3Client;
}
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
try {
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]", segment.getIdentifier());
}
}
}

View File

@ -87,11 +87,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", outputBucket,
"key", toPush.getKey()
)
ImmutableMap.<String, Object>of("type", "s3_zip", "bucket", outputBucket, "key", toPush.getKey())
)
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));

View File

@ -1,49 +0,0 @@
package com.metamx.druid.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.util.Collection;
import java.util.Map;
/**
*/
public class S3SegmentKiller implements SegmentKiller
{
private static final Logger log = new Logger(S3SegmentKiller.class);
private final RestS3Service s3Client;
@Inject
public S3SegmentKiller(
RestS3Service s3Client
)
{
this.s3Client = s3Client;
}
@Override
public void kill(Collection<DataSegment> segments) throws ServiceException
{
for (final DataSegment segment : segments) {
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
}
}
}

View File

@ -1,14 +0,0 @@
package com.metamx.druid.loading;
import com.metamx.druid.client.DataSegment;
import org.jets3t.service.ServiceException;
import java.util.Collection;
import java.util.List;
/**
*/
public interface SegmentKiller
{
public void kill(Collection<DataSegment> segments) throws ServiceException;
}