RealtimeIndexTask: Fix serde

This commit is contained in:
Gian Merlino 2013-03-14 13:43:40 -07:00
parent b8c08f235a
commit e45a51714f
2 changed files with 76 additions and 6 deletions

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
@ -39,27 +40,29 @@ import java.io.IOException;
public class RealtimeIndexTask extends AbstractTask
{
@JsonProperty
@JsonIgnore
final Schema schema;
@JsonProperty
@JsonIgnore
final FirehoseFactory firehoseFactory;
@JsonProperty
@JsonIgnore
final FireDepartmentConfig fireDepartmentConfig;
@JsonProperty
@JsonIgnore
final Period windowPeriod;
@JsonProperty
@JsonIgnore
final IndexGranularity segmentGranularity;
@JsonIgnore
private volatile Plumber plumber = null;
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
@JsonCreator
public RealtimeIndexTask(
@JsonProperty("id") String id,
@JsonProperty("schema") Schema schema,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename?
@ -68,7 +71,7 @@ public class RealtimeIndexTask extends AbstractTask
)
{
super(
String.format(
id != null ? id : String.format(
"index_realtime_%s_%d_%s",
schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime()
),
@ -257,6 +260,36 @@ public class RealtimeIndexTask extends AbstractTask
return TaskStatus.success(getId());
}
@JsonProperty
public Schema getSchema()
{
return schema;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty
public FireDepartmentConfig getFireDepartmentConfig()
{
return fireDepartmentConfig;
}
@JsonProperty
public Period getWindowPeriod()
{
return windowPeriod;
}
@JsonProperty
public IndexGranularity getSegmentGranularity()
{
return segmentGranularity;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;

View File

@ -8,17 +8,20 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
import com.metamx.druid.indexer.data.JSONDataSpec;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.path.StaticPathSpec;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Test;
public class TaskSerdeTest
@ -193,6 +196,40 @@ public class TaskSerdeTest
);
}
@Test
public void testRealtimeIndexTaskSerde() throws Exception
{
final Task task = new RealtimeIndexTask(
null,
new Schema("foo", new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
new Period("PT10M"),
IndexGranularity.HOUR
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.<Interval>absent(), task.getImplicitLockInterval());
Assert.assertEquals(new Period("PT10M"), ((RealtimeIndexTask) task).getWindowPeriod());
Assert.assertEquals(IndexGranularity.HOUR, ((RealtimeIndexTask) task).getSegmentGranularity());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
Assert.assertEquals(((RealtimeIndexTask) task).getWindowPeriod(), ((RealtimeIndexTask) task).getWindowPeriod());
Assert.assertEquals(
((RealtimeIndexTask) task).getSegmentGranularity(),
((RealtimeIndexTask) task).getSegmentGranularity()
);
}
@Test
public void testDeleteTaskSerde() throws Exception
{