From e45a51714f0db97cf48e51fbfb884dc5425842e0 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 14 Mar 2013 13:43:40 -0700 Subject: [PATCH] RealtimeIndexTask: Fix serde --- .../merger/common/task/RealtimeIndexTask.java | 45 ++++++++++++++++--- .../merger/common/task/TaskSerdeTest.java | 37 +++++++++++++++ 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index ac0c3a6b77b..27278537cca 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -1,6 +1,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; @@ -39,27 +40,29 @@ import java.io.IOException; public class RealtimeIndexTask extends AbstractTask { - @JsonProperty + @JsonIgnore final Schema schema; - @JsonProperty + @JsonIgnore final FirehoseFactory firehoseFactory; - @JsonProperty + @JsonIgnore final FireDepartmentConfig fireDepartmentConfig; - @JsonProperty + @JsonIgnore final Period windowPeriod; - @JsonProperty + @JsonIgnore final IndexGranularity segmentGranularity; + @JsonIgnore private volatile Plumber plumber = null; private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); @JsonCreator public RealtimeIndexTask( + @JsonProperty("id") String id, @JsonProperty("schema") Schema schema, @JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename? @@ -68,7 +71,7 @@ public class RealtimeIndexTask extends AbstractTask ) { super( - String.format( + id != null ? id : String.format( "index_realtime_%s_%d_%s", schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime() ), @@ -257,6 +260,36 @@ public class RealtimeIndexTask extends AbstractTask return TaskStatus.success(getId()); } + @JsonProperty + public Schema getSchema() + { + return schema; + } + + @JsonProperty("firehose") + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + + @JsonProperty + public FireDepartmentConfig getFireDepartmentConfig() + { + return fireDepartmentConfig; + } + + @JsonProperty + public Period getWindowPeriod() + { + return windowPeriod; + } + + @JsonProperty + public IndexGranularity getSegmentGranularity() + { + return segmentGranularity; + } + public static class TaskActionSegmentPublisher implements SegmentPublisher { final Task task; diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java index 1f1a8c41038..1bb89b5f899 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java @@ -8,17 +8,20 @@ import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.CountAggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.indexer.HadoopDruidIndexerConfig; import com.metamx.druid.indexer.data.JSONDataSpec; import com.metamx.druid.indexer.granularity.UniformGranularitySpec; import com.metamx.druid.indexer.path.StaticPathSpec; import com.metamx.druid.indexer.rollup.DataRollupSpec; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.realtime.Schema; import com.metamx.druid.shard.NoneShardSpec; import junit.framework.Assert; import com.fasterxml.jackson.databind.ObjectMapper; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.Test; public class TaskSerdeTest @@ -193,6 +196,40 @@ public class TaskSerdeTest ); } + @Test + public void testRealtimeIndexTaskSerde() throws Exception + { + final Task task = new RealtimeIndexTask( + null, + new Schema("foo", new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()), + null, + null, + new Period("PT10M"), + IndexGranularity.HOUR + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.absent(), task.getImplicitLockInterval()); + Assert.assertEquals(new Period("PT10M"), ((RealtimeIndexTask) task).getWindowPeriod()); + Assert.assertEquals(IndexGranularity.HOUR, ((RealtimeIndexTask) task).getSegmentGranularity()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(((RealtimeIndexTask) task).getWindowPeriod(), ((RealtimeIndexTask) task).getWindowPeriod()); + Assert.assertEquals( + ((RealtimeIndexTask) task).getSegmentGranularity(), + ((RealtimeIndexTask) task).getSegmentGranularity() + ); + } + @Test public void testDeleteTaskSerde() throws Exception {