From cf470b1ed474e58be20f10ebc7524e4d64331299 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 13 Mar 2013 18:39:06 -0700 Subject: [PATCH] Merger: Task serde without relying on jackson private-final-setter magic --- .../merger/common/task/AbstractTask.java | 8 + .../druid/merger/common/task/AppendTask.java | 3 +- .../druid/merger/common/task/DeleteTask.java | 3 +- .../merger/common/task/HadoopIndexTask.java | 12 +- .../task/IndexDeterminePartitionsTask.java | 37 +++- .../common/task/IndexGeneratorTask.java | 28 ++- .../druid/merger/common/task/IndexTask.java | 55 +++++- .../druid/merger/common/task/KillTask.java | 3 +- .../druid/merger/common/task/MergeTask.java | 11 +- .../merger/common/task/MergeTaskBase.java | 8 +- .../common/task/VersionConverterTask.java | 4 + .../com/metamx/druid/merger/TestTask.java | 12 +- .../merger/common/task/MergeTaskBaseTest.java | 2 +- .../merger/common/task/TaskSerdeTest.java | 159 +++++++++++++++++- .../merger/coordinator/TaskLifecycleTest.java | 4 +- 15 files changed, 308 insertions(+), 41 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java index 502c9838de2..917b446e7b1 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.common.task; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; import com.google.common.base.Objects; @@ -33,9 +34,16 @@ public abstract class AbstractTask implements Task { private static final Joiner ID_JOINER = Joiner.on("_"); + @JsonIgnore private final String id; + + @JsonIgnore private final String groupId; + + @JsonIgnore private final String dataSource; + + @JsonIgnore private final Optional interval; protected AbstractTask(String id, String dataSource, Interval interval) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java index 5d15269677a..b00c1c24399 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java @@ -48,11 +48,12 @@ public class AppendTask extends MergeTaskBase { @JsonCreator public AppendTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments ) { - super(dataSource, segments); + super(id, dataSource, segments); } @Override diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java index 86fd2a7ec37..5d704b26b3f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java @@ -50,12 +50,13 @@ public class DeleteTask extends AbstractTask @JsonCreator public DeleteTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) { super( - String.format( + id != null ? id : String.format( "delete_%s_%s_%s_%s", dataSource, interval.getStart(), diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java index 6e284557529..f3ce30c90cb 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -41,7 +42,7 @@ import java.util.List; public class HadoopIndexTask extends AbstractTask { - @JsonProperty + @JsonIgnore private final HadoopDruidIndexerConfig config; private static final Logger log = new Logger(HadoopIndexTask.class); @@ -58,11 +59,12 @@ public class HadoopIndexTask extends AbstractTask @JsonCreator public HadoopIndexTask( + @JsonProperty("id") String id, @JsonProperty("config") HadoopDruidIndexerConfig config ) { super( - String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()), + id != null ? id : String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()), config.getDataSource(), JodaUtils.umbrellaInterval(config.getIntervals()) ); @@ -133,4 +135,10 @@ public class HadoopIndexTask extends AbstractTask } } + + @JsonProperty + public HadoopDruidIndexerConfig getConfig() + { + return config; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java index 47f72b12501..675b1675072 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -48,22 +49,23 @@ import java.util.Set; public class IndexDeterminePartitionsTask extends AbstractTask { - @JsonProperty + @JsonIgnore private final FirehoseFactory firehoseFactory; - @JsonProperty + @JsonIgnore private final Schema schema; - @JsonProperty + @JsonIgnore private final long targetPartitionSize; - @JsonProperty + @JsonIgnore private final int rowFlushBoundary; private static final Logger log = new Logger(IndexTask.class); @JsonCreator public IndexDeterminePartitionsTask( + @JsonProperty("id") String id, @JsonProperty("groupId") String groupId, @JsonProperty("interval") Interval interval, @JsonProperty("firehose") FirehoseFactory firehoseFactory, @@ -73,7 +75,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask ) { super( - String.format( + id != null ? id : String.format( "%s_partitions_%s_%s", groupId, interval.getStart(), @@ -243,6 +245,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask public Task apply(ShardSpec shardSpec) { return new IndexGeneratorTask( + null, getGroupId(), getImplicitLockInterval().get(), firehoseFactory, @@ -262,4 +265,28 @@ public class IndexDeterminePartitionsTask extends AbstractTask return TaskStatus.success(getId()); } + + @JsonProperty + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + + @JsonProperty + public Schema getSchema() + { + return schema; + } + + @JsonProperty + public long getTargetPartitionSize() + { + return targetPartitionSize; + } + + @JsonProperty + public int getRowFlushBoundary() + { + return rowFlushBoundary; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java index dd928883232..6eb58ea91c6 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -52,19 +53,20 @@ import java.util.concurrent.CopyOnWriteArrayList; public class IndexGeneratorTask extends AbstractTask { - @JsonProperty + @JsonIgnore private final FirehoseFactory firehoseFactory; - @JsonProperty + @JsonIgnore private final Schema schema; - @JsonProperty + @JsonIgnore private final int rowFlushBoundary; private static final Logger log = new Logger(IndexTask.class); @JsonCreator public IndexGeneratorTask( + @JsonProperty("id") String id, @JsonProperty("groupId") String groupId, @JsonProperty("interval") Interval interval, @JsonProperty("firehose") FirehoseFactory firehoseFactory, @@ -73,7 +75,7 @@ public class IndexGeneratorTask extends AbstractTask ) { super( - String.format( + id != null ? id : String.format( "%s_generator_%s_%s_%s", groupId, interval.getStart(), @@ -216,4 +218,22 @@ public class IndexGeneratorTask extends AbstractTask return schema.getShardSpec().isInChunk(eventDimensions); } + + @JsonProperty + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + + @JsonProperty + public Schema getSchema() + { + return schema; + } + + @JsonProperty + public int getRowFlushBoundary() + { + return rowFlushBoundary; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java index 35babcd6a22..a86c57d94f5 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -40,28 +41,29 @@ import java.util.List; public class IndexTask extends AbstractTask { - @JsonProperty + @JsonIgnore private final GranularitySpec granularitySpec; - @JsonProperty + @JsonIgnore private final AggregatorFactory[] aggregators; - @JsonProperty + @JsonIgnore private final QueryGranularity indexGranularity; - @JsonProperty + @JsonIgnore private final long targetPartitionSize; - @JsonProperty + @JsonIgnore private final FirehoseFactory firehoseFactory; - @JsonProperty + @JsonIgnore private final int rowFlushBoundary; private static final Logger log = new Logger(IndexTask.class); @JsonCreator public IndexTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JsonProperty("aggregators") AggregatorFactory[] aggregators, @@ -73,7 +75,7 @@ public class IndexTask extends AbstractTask { super( // _not_ the version, just something uniqueish - String.format("index_%s_%s", dataSource, new DateTime().toString()), + id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()), dataSource, new Interval( granularitySpec.bucketIntervals().first().getStart(), @@ -98,6 +100,7 @@ public class IndexTask extends AbstractTask // Need to do one pass over the data before indexing in order to determine good partitions retVal.add( new IndexDeterminePartitionsTask( + null, getGroupId(), interval, firehoseFactory, @@ -115,6 +118,7 @@ public class IndexTask extends AbstractTask // Jump straight into indexing retVal.add( new IndexGeneratorTask( + null, getGroupId(), interval, firehoseFactory, @@ -151,4 +155,41 @@ public class IndexTask extends AbstractTask { throw new IllegalStateException("IndexTasks should not be run!"); } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { + return granularitySpec; + } + + @JsonProperty + public AggregatorFactory[] getAggregators() + { + return aggregators; + } + + @JsonProperty + public QueryGranularity getIndexGranularity() + { + return indexGranularity; + } + + @JsonProperty + public long getTargetPartitionSize() + { + return targetPartitionSize; + } + + @JsonProperty + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + + @JsonProperty + public int getRowFlushBoundary() + { + return rowFlushBoundary; + } + } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java index f4476ffd858..e26a25fd038 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -45,12 +45,13 @@ public class KillTask extends AbstractTask @JsonCreator public KillTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) { super( - String.format( + id != null ? id : String.format( "kill_%s_%s_%s_%s", dataSource, interval.getStart(), diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java index 4e6102f666b..9867eec0c4c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Throwables; @@ -42,16 +43,18 @@ import java.util.Map; */ public class MergeTask extends MergeTaskBase { + @JsonIgnore private final List aggregators; @JsonCreator public MergeTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, @JsonProperty("aggregations") List aggregators ) { - super(dataSource, segments); + super(id, dataSource, segments); this.aggregators = aggregators; } @@ -86,4 +89,10 @@ public class MergeTask extends MergeTaskBase { return "merge"; } + + @JsonProperty("aggregations") + public List getAggregators() + { + return aggregators; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java index 4bda0363941..7402d69e537 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.common.task; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; import com.google.common.base.Function; @@ -56,15 +57,18 @@ import java.util.Set; */ public abstract class MergeTaskBase extends AbstractTask { + @JsonIgnore private final List segments; private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class); - protected MergeTaskBase(final String dataSource, final List segments) + protected MergeTaskBase(final String id, final String dataSource, final List segments) { super( // _not_ the version, just something uniqueish - String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()), + id != null ? id : String.format( + "merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString() + ), dataSource, computeMergedInterval(segments) ); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java index db5234dce5d..c5db8aba959 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -52,6 +53,8 @@ public class VersionConverterTask extends AbstractTask private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID); private static final Logger log = new Logger(VersionConverterTask.class); + + @JsonIgnore private final DataSegment segment; public static VersionConverterTask create(String dataSource, Interval interval) @@ -172,6 +175,7 @@ public class VersionConverterTask extends AbstractTask public static class SubTask extends AbstractTask { + @JsonIgnore private final DataSegment segment; @JsonCreator diff --git a/merger/src/test/java/com/metamx/druid/merger/TestTask.java b/merger/src/test/java/com/metamx/druid/merger/TestTask.java index d0a77cff447..2aa41dc031f 100644 --- a/merger/src/test/java/com/metamx/druid/merger/TestTask.java +++ b/merger/src/test/java/com/metamx/druid/merger/TestTask.java @@ -35,7 +35,6 @@ import java.util.List; @JsonTypeName("test") public class TestTask extends MergeTask { - private final String id; private final TaskStatus status; @JsonCreator @@ -47,19 +46,10 @@ public class TestTask extends MergeTask @JsonProperty("taskStatus") TaskStatus status ) { - super(dataSource, segments, aggregators); - - this.id = id; + super(id, dataSource, segments, aggregators); this.status = status; } - @Override - @JsonProperty - public String getId() - { - return id; - } - @Override @JsonProperty public String getType() diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java index a2f6e8175fb..e8c6622369a 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java @@ -43,7 +43,7 @@ public class MergeTaskBaseTest .add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build()) .build(); - final MergeTaskBase testMergeTaskBase = new MergeTaskBase("foo", segments) + final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments) { @Override protected File merge(Map segments, File outDir) throws Exception diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java index 701093209ea..1f1a8c41038 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java @@ -5,6 +5,7 @@ import com.google.common.collect.ImmutableList; import com.metamx.common.Granularity; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.CountAggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.indexer.HadoopDruidIndexerConfig; @@ -26,6 +27,7 @@ public class TaskSerdeTest public void testIndexTaskSerde() throws Exception { final Task task = new IndexTask( + null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))), new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, @@ -54,6 +56,7 @@ public class TaskSerdeTest public void testIndexGeneratorTaskSerde() throws Exception { final Task task = new IndexGeneratorTask( + null, "foo", new Interval("2010-01-01/P1D"), null, @@ -68,6 +71,8 @@ public class TaskSerdeTest final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final Task task2 = jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); @@ -80,17 +85,23 @@ public class TaskSerdeTest } @Test - public void testAppendTaskSerde() throws Exception + public void testMergeTaskSerde() throws Exception { - final Task task = new AppendTask( + final Task task = new MergeTask( + null, "foo", ImmutableList.of( DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + ), + ImmutableList.of( + new CountAggregatorFactory("cnt") ) ); final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final Task task2 = jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); @@ -100,20 +111,131 @@ public class TaskSerdeTest Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(((MergeTask) task).getSegments(), ((MergeTask) task2).getSegments()); + Assert.assertEquals( + ((MergeTask) task).getAggregators().get(0).getName(), + ((MergeTask) task2).getAggregators().get(0).getName() + ); } @Test - public void testDeleteTaskSerde() throws Exception + public void testKillTaskSerde() throws Exception { - final Task task = new DeleteTask( + final Task task = new KillTask( + null, "foo", new Interval("2010-01-01/P1D") ); final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final Task task2 = jsonMapper.readValue(json, Task.class); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + } + + @Test + public void testVersionConverterTaskSerde() throws Exception + { + final Task task = VersionConverterTask.create( + DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(((VersionConverterTask) task).getSegment(), ((VersionConverterTask) task).getSegment()); + } + + @Test + public void testVersionConverterSubTaskSerde() throws Exception + { + final Task task = new VersionConverterTask.SubTask( + "myGroupId", + DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + Assert.assertEquals("myGroupId", task.getGroupId()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals( + ((VersionConverterTask.SubTask) task).getSegment(), + ((VersionConverterTask.SubTask) task).getSegment() + ); + } + + @Test + public void testDeleteTaskSerde() throws Exception + { + final Task task = new DeleteTask( + null, + "foo", + new Interval("2010-01-01/P1D") + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get()); + } + + + @Test + public void testDeleteTaskFromJson() throws Exception + { + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final Task task = jsonMapper.readValue( + "{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}", + Task.class + ); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertNotNull(task.getId()); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); @@ -121,10 +243,39 @@ public class TaskSerdeTest Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get()); } + @Test + public void testAppendTaskSerde() throws Exception + { + final Task task = new AppendTask( + null, + "foo", + ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + ) + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get()); + Assert.assertEquals(((AppendTask) task).getSegments(), ((AppendTask) task2).getSegments()); + } + @Test public void testHadoopIndexTaskSerde() throws Exception { final HadoopIndexTask task = new HadoopIndexTask( + null, new HadoopDruidIndexerConfig( null, "foo", diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index c94369726e9..917a264237c 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -184,6 +184,7 @@ public class TaskLifecycleTest public void testIndexTask() throws Exception { final Task indexTask = new IndexTask( + null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))), new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, @@ -226,6 +227,7 @@ public class TaskLifecycleTest public void testIndexTaskFailure() throws Exception { final Task indexTask = new IndexTask( + null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, @@ -249,7 +251,7 @@ public class TaskLifecycleTest { // This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator // Such that this test can test things... - final Task killTask = new KillTask("foo", new Interval("2010-01-02/P2D")); + final Task killTask = new KillTask(null, "foo", new Interval("2010-01-02/P2D")); final TaskStatus status = runTask(killTask); Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());