From a11a34f87ee80cec05cc32be407aa38156040651 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Mon, 4 Mar 2013 13:09:42 -0600 Subject: [PATCH] 1) Initial commit: Converter Task --- .../com/metamx/druid/index/v1/IndexIO.java | 15 +++- .../merger/common/task/AbstractTask.java | 20 +++++ .../druid/merger/common/task/KillTask.java | 30 +++++-- .../metamx/druid/merger/common/task/Task.java | 4 +- .../merger/common/task/V8toV9UpgradeTask.java | 36 --------- .../common/task/VersionConverterSubTask.java | 79 +++++++++++++++++++ .../common/task/VersionConverterTask.java | 74 +++++++++++++++++ 7 files changed, 212 insertions(+), 46 deletions(-) delete mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/V8toV9UpgradeTask.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java index d26e73f5b3c..621989b0d08 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java @@ -199,6 +199,19 @@ public class IndexIO } } + public static boolean convertSegment(File toConvert, File converted) throws IOException + { + final int version = getVersionFromDir(toConvert); + + switch (version) { + case 8: + DefaultIndexIOHandler.convertV8toV9(toConvert, converted); + return true; + default: + return false; + } + } + public static interface IndexIOHandler { /** @@ -229,7 +242,7 @@ public class IndexIO public void storeLatest(Index index, File file) throws IOException; } - static class DefaultIndexIOHandler implements IndexIOHandler + public static class DefaultIndexIOHandler implements IndexIOHandler { private static final Logger log = new Logger(DefaultIndexIOHandler.class); @Override diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java index 899e2066fb0..119eab3ec87 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java @@ -20,15 +20,19 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.actions.SegmentListUsedAction; import org.joda.time.Interval; public abstract class AbstractTask implements Task { + private static final Joiner ID_JOINER = Joiner.on("_"); + private final String id; private final String groupId; private final String dataSource; @@ -91,4 +95,20 @@ public abstract class AbstractTask implements Task .add("interval", getImplicitLockInterval()) .toString(); } + + /** Start helper methods **/ + public static String joinId(Object... objects) + { + return ID_JOINER.join(objects); + } + + public SegmentListUsedAction makeListUsedAction() + { + return new SegmentListUsedAction(this, getDataSource(), getFixedInterval().get()); + } + + public TaskStatus success() + { + return TaskStatus.success(getId()); + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java index 8c2b5af8e6e..bf1bbbabd90 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; @@ -64,14 +83,9 @@ public class KillTask extends AbstractTask } // List unused segments - final List unusedSegments = toolbox.getTaskActionClient() - .submit( - new SegmentListUnusedAction( - this, - myLock.getDataSource(), - myLock.getInterval() - ) - ); + final List unusedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUnusedAction(this, myLock.getDataSource(), myLock.getInterval())); // Verify none of these segments have versions > lock version for(final DataSegment unusedSegment : unusedSegments) { diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index 8d93486ff6b..b1a4598a2d2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -50,7 +50,9 @@ import org.joda.time.Interval; @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class), @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class), - @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class) + @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), + @JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class), + @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterSubTask.class), }) public interface Task { diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/V8toV9UpgradeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/V8toV9UpgradeTask.java deleted file mode 100644 index 28d3ab1fec1..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/V8toV9UpgradeTask.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.metamx.druid.merger.common.task; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.metamx.druid.merger.common.TaskStatus; -import com.metamx.druid.merger.common.TaskToolbox; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -/** - */ -public class V8toV9UpgradeTask extends AbstractTask -{ - public V8toV9UpgradeTask( - @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval - ) - { - super( - String.format("v8tov9_%s_%s_%s", dataSource, interval.toString().replace("/", "_"), new DateTime()), - dataSource, - interval - ); - } - - @Override - public String getType() - { - return "8to9"; - } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception - { - throw new UnsupportedOperationException(); - } -} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java new file mode 100644 index 00000000000..4b5c3d251f0 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java @@ -0,0 +1,79 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common.task; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Sets; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.actions.SegmentInsertAction; + +import java.io.File; +import java.util.Arrays; +import java.util.Map; + +/** + */ +public class VersionConverterSubTask extends AbstractTask +{ + private final DataSegment segment; + + protected VersionConverterSubTask( + @JsonProperty("groupId") String groupId, + @JsonProperty("segment") DataSegment segment + ) + { + super( + joinId( + groupId, + "sub", + segment.getInterval().getStart(), + segment.getInterval().getEnd(), + segment.getShardSpec().getPartitionNum() + ), + segment.getDataSource(), + segment.getInterval() + ); + this.segment = segment; + } + + @Override + public String getType() + { + return "version_converter_sub"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + final Map localSegments = toolbox.getSegments(this, Arrays.asList(segment)); + + final File location = localSegments.get(segment); + final File outLocation = new File(location, "v9_out"); + if (IndexIO.convertSegment(location, outLocation)) { + final DataSegment updatedSegment = toolbox.getSegmentPusher().push(outLocation, segment); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, Sets.newHashSet(updatedSegment))); + } + + return success(); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java new file mode 100644 index 00000000000..062f3751767 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -0,0 +1,74 @@ +package com.metamx.druid.merger.common.task; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.actions.SpawnTasksAction; +import com.metamx.druid.merger.common.actions.TaskActionClient; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.List; + +/** + */ +public class VersionConverterTask extends AbstractTask +{ + private static final String TYPE = "version_converter"; + + public VersionConverterTask( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + super( + joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime()), + dataSource, + interval + ); + } + + @Override + public String getType() + { + return TYPE; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + throw new ISE("Should never run, %s just exists to create subtasks", this.getClass().getSimpleName()); + } + + @Override + public TaskStatus preflight(TaskToolbox toolbox) throws Exception + { + final TaskActionClient taskClient = toolbox.getTaskActionClient(); + + List segments = taskClient.submit(makeListUsedAction()); + + taskClient.submit( + new SpawnTasksAction( + this, + Lists.transform( + segments, + new Function() + { + @Override + public Task apply(@Nullable DataSegment input) + { + return new VersionConverterSubTask(getGroupId(), input); + } + } + ) + ) + ); + + return TaskStatus.success(getId()); + } +}