1) Initial commit: Converter Task

This commit is contained in:
Eric Tschetter 2013-03-04 13:09:42 -06:00
parent fe4ca2a1a2
commit a11a34f87e
7 changed files with 212 additions and 46 deletions

View File

@ -199,6 +199,19 @@ public class IndexIO
}
}
public static boolean convertSegment(File toConvert, File converted) throws IOException
{
final int version = getVersionFromDir(toConvert);
switch (version) {
case 8:
DefaultIndexIOHandler.convertV8toV9(toConvert, converted);
return true;
default:
return false;
}
}
public static interface IndexIOHandler
{
/**
@ -229,7 +242,7 @@ public class IndexIO
public void storeLatest(Index index, File file) throws IOException;
}
static class DefaultIndexIOHandler implements IndexIOHandler
public static class DefaultIndexIOHandler implements IndexIOHandler
{
private static final Logger log = new Logger(DefaultIndexIOHandler.class);
@Override

View File

@ -20,15 +20,19 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SegmentListUsedAction;
import org.joda.time.Interval;
public abstract class AbstractTask implements Task
{
private static final Joiner ID_JOINER = Joiner.on("_");
private final String id;
private final String groupId;
private final String dataSource;
@ -91,4 +95,20 @@ public abstract class AbstractTask implements Task
.add("interval", getImplicitLockInterval())
.toString();
}
/** Start helper methods **/
public static String joinId(Object... objects)
{
return ID_JOINER.join(objects);
}
public SegmentListUsedAction makeListUsedAction()
{
return new SegmentListUsedAction(this, getDataSource(), getFixedInterval().get());
}
public TaskStatus success()
{
return TaskStatus.success(getId());
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -64,14 +83,9 @@ public class KillTask extends AbstractTask
}
// List unused segments
final List<DataSegment> unusedSegments = toolbox.getTaskActionClient()
.submit(
new SegmentListUnusedAction(
this,
myLock.getDataSource(),
myLock.getInterval()
)
);
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUnusedAction(this, myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
for(final DataSegment unusedSegment : unusedSegments) {

View File

@ -50,7 +50,9 @@ import org.joda.time.Interval;
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class),
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class)
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class),
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterSubTask.class),
})
public interface Task
{

View File

@ -1,36 +0,0 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
*/
public class V8toV9UpgradeTask extends AbstractTask
{
public V8toV9UpgradeTask(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
String.format("v8tov9_%s_%s_%s", dataSource, interval.toString().replace("/", "_"), new DateTime()),
dataSource,
interval
);
}
@Override
public String getType()
{
return "8to9";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,79 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import java.io.File;
import java.util.Arrays;
import java.util.Map;
/**
*/
public class VersionConverterSubTask extends AbstractTask
{
private final DataSegment segment;
protected VersionConverterSubTask(
@JsonProperty("groupId") String groupId,
@JsonProperty("segment") DataSegment segment
)
{
super(
joinId(
groupId,
"sub",
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getShardSpec().getPartitionNum()
),
segment.getDataSource(),
segment.getInterval()
);
this.segment = segment;
}
@Override
public String getType()
{
return "version_converter_sub";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Map<DataSegment, File> localSegments = toolbox.getSegments(this, Arrays.asList(segment));
final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out");
if (IndexIO.convertSegment(location, outLocation)) {
final DataSegment updatedSegment = toolbox.getSegmentPusher().push(outLocation, segment);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, Sets.newHashSet(updatedSegment)));
}
return success();
}
}

View File

@ -0,0 +1,74 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
/**
*/
public class VersionConverterTask extends AbstractTask
{
private static final String TYPE = "version_converter";
public VersionConverterTask(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime()),
dataSource,
interval
);
}
@Override
public String getType()
{
return TYPE;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
throw new ISE("Should never run, %s just exists to create subtasks", this.getClass().getSimpleName());
}
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{
final TaskActionClient taskClient = toolbox.getTaskActionClient();
List<DataSegment> segments = taskClient.submit(makeListUsedAction());
taskClient.submit(
new SpawnTasksAction(
this,
Lists.transform(
segments,
new Function<DataSegment, Task>()
{
@Override
public Task apply(@Nullable DataSegment input)
{
return new VersionConverterSubTask(getGroupId(), input);
}
}
)
)
);
return TaskStatus.success(getId());
}
}