From e40fba4de2bb32eb05a477708e1ec6abb4872912 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 27 Feb 2013 14:50:15 -0800 Subject: [PATCH] HadoopIndexTask: Jackson fixes and general overriding of storage-specific stuff --- .../indexer/HadoopDruidIndexerConfig.java | 4 +- .../merger/common/config/TaskConfig.java | 3 + .../merger/common/task/HadoopIndexTask.java | 43 ++++++++----- .../metamx/druid/merger/common/task/Task.java | 3 +- .../merger/common/task/TaskSerdeTest.java | 63 +++++++++++++++++-- .../coordinator/RemoteTaskRunnerTest.java | 6 ++ .../merger/coordinator/TaskLifecycleTest.java | 6 ++ 7 files changed, 105 insertions(+), 23 deletions(-) diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 41e812bba30..5dc45d19484 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -198,7 +198,7 @@ public class HadoopDruidIndexerConfig final @JsonProperty("pathSpec") PathSpec pathSpec, final @JsonProperty("workingPath") String jobOutputDir, final @JsonProperty("segmentOutputPath") String segmentOutputDir, - final @JsonProperty("version") DateTime version, + final @JsonProperty("version") String version, final @JsonProperty("partitionDimension") String partitionDimension, final @JsonProperty("targetPartitionSize") Long targetPartitionSize, final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, @@ -220,7 +220,7 @@ public class HadoopDruidIndexerConfig this.pathSpec = pathSpec; this.jobOutputDir = jobOutputDir; this.segmentOutputDir = segmentOutputDir; - this.version = version == null ? new DateTime().toString() : version.toString(); + this.version = version == null ? new DateTime().toString() : version; this.partitionsSpec = partitionsSpec; this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = cleanupOnFailure; diff --git a/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java b/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java index 5b7609bd042..5918f0627c6 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java @@ -15,6 +15,9 @@ public abstract class TaskConfig @Default("500000") public abstract int getDefaultRowFlushBoundary(); + @Config("druid.merger.hadoopWorkingPath") + public abstract String getHadoopWorkingPath(); + public File getTaskDir(final Task task) { return new File(getBaseTaskDir(), task.getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java index db25239b50c..29c0c517f17 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java @@ -1,6 +1,8 @@ package com.metamx.druid.merger.common.task; -import com.google.common.collect.ImmutableList; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.metamx.common.logger.Logger; @@ -14,20 +16,17 @@ import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.actions.LockListAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.utils.JodaUtils; -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; import org.joda.time.DateTime; import java.util.List; public class HadoopIndexTask extends AbstractTask { - @JsonProperty - private static final Logger log = new Logger(HadoopIndexTask.class); - @JsonProperty private final HadoopDruidIndexerConfig config; + private static final Logger log = new Logger(HadoopIndexTask.class); + /** * @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters * for creating Druid index segments. It may be modified. @@ -44,16 +43,16 @@ public class HadoopIndexTask extends AbstractTask ) { super( - String.format("index_hadoop_%s_interval_%s", config.getDataSource(), new DateTime()), + String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()), config.getDataSource(), JodaUtils.umbrellaInterval(config.getIntervals()) ); - if (config.isUpdaterJobSpecSet()) { - throw new IllegalArgumentException( - "The UpdaterJobSpec field of the Hadoop Druid indexer config must be set to null" - ); - } + // Some HadoopDruidIndexerConfig stuff doesn't make sense in the context of the indexing service + Preconditions.checkArgument(config.getSegmentOutputDir() == null, "segmentOutputPath must be absent"); + Preconditions.checkArgument(config.getJobOutputDir() == null, "workingPath must be absent"); + Preconditions.checkArgument(!config.isUpdaterJobSpecSet(), "updaterJobSpec must be absent"); + this.config = config; } @@ -66,10 +65,21 @@ public class HadoopIndexTask extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { + // Copy config so we don't needlessly modify our provided one + // Also necessary to make constructor validations work upon serde-after-run + final HadoopDruidIndexerConfig configCopy = toolbox.getObjectMapper() + .readValue( + toolbox.getObjectMapper().writeValueAsBytes(config), + HadoopDruidIndexerConfig.class + ); + // We should have a lock from before we started running final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this))); log.info("Setting version to: %s", myLock.getVersion()); - config.setVersion(myLock.getVersion()); + configCopy.setVersion(myLock.getVersion()); + + // Set workingPath to some reasonable default + configCopy.setJobOutputDir(toolbox.getConfig().getHadoopWorkingPath()); if (toolbox.getSegmentPusher() instanceof S3DataSegmentPusher) { // Hack alert! Bypassing DataSegmentPusher... @@ -82,14 +92,15 @@ public class HadoopIndexTask extends AbstractTask ); log.info("Setting segment output path to: %s", s3Path); - config.setSegmentOutputDir(s3Path); + configCopy.setSegmentOutputDir(s3Path); } else { throw new IllegalStateException("Sorry, we only work with S3DataSegmentPushers! Bummer!"); } - HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config); - log.debug("Starting a hadoop index generator job..."); + HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(configCopy); + configCopy.verify(); + log.info("Starting a hadoop index generator job..."); if (job.run()) { List publishedSegments = job.getPublishedSegments(); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index 5ac4dcf71df..60a265564da 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -37,7 +37,8 @@ import org.joda.time.Interval; @JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class), - @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class) + @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class), + @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class) }) public interface Task { diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java index 51310bb2ef0..213bec34421 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java @@ -1,12 +1,17 @@ package com.metamx.druid.merger.common.task; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.metamx.common.Granularity; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.indexer.HadoopDruidIndexerConfig; +import com.metamx.druid.indexer.data.JSONDataSpec; import com.metamx.druid.indexer.granularity.UniformGranularitySpec; +import com.metamx.druid.indexer.path.StaticPathSpec; +import com.metamx.druid.indexer.rollup.DataRollupSpec; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.realtime.Schema; import com.metamx.druid.shard.NoneShardSpec; @@ -32,13 +37,17 @@ public class TaskSerdeTest final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final Task task2 = jsonMapper.readValue(json, Task.class); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P2D")), task.getFixedInterval()); + Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); - Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); } @Test @@ -61,11 +70,13 @@ public class TaskSerdeTest final String json = jsonMapper.writeValueAsString(task); final Task task2 = jsonMapper.readValue(json, Task.class); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval()); + Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); - Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); } @Test @@ -82,11 +93,13 @@ public class TaskSerdeTest final String json = jsonMapper.writeValueAsString(task); final Task task2 = jsonMapper.readValue(json, Task.class); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval()); + Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); - Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); } @Test @@ -99,7 +112,6 @@ public class TaskSerdeTest final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); - System.out.println(json); final Task task2 = jsonMapper.readValue(json, Task.class); Assert.assertEquals(task.getId(), task2.getId()); @@ -108,4 +120,47 @@ public class TaskSerdeTest Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); } + + @Test + public void testHadoopIndexTaskSerde() throws Exception + { + final HadoopIndexTask task = new HadoopIndexTask( + new HadoopDruidIndexerConfig( + null, + "foo", + "timestamp", + "auto", + new JSONDataSpec(ImmutableList.of("foo")), + null, + new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), + new StaticPathSpec("bar"), + null, + null, + null, + null, + null, + null, + false, + true, + null, + false, + new DataRollupSpec(ImmutableList.of(), QueryGranularity.NONE), + null, + false, + ImmutableList.of() + ) + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); + } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index 60309f1e368..6d48ce17df9 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -300,6 +300,12 @@ public class RemoteTaskRunnerTest { return 0; } + + @Override + public String getHadoopWorkingPath() + { + return null; + } }, null, null, null, null, null, jsonMapper ), Executors.newSingleThreadExecutor() diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index 69a364e1900..ae5b46fdfc1 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -110,6 +110,12 @@ public class TaskLifecycleTest { return 50000; } + + @Override + public String getHadoopWorkingPath() + { + return null; + } }, new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())), newMockEmitter(),