Merge pull request #1676 from metamx/fix-convert-segment-task

fix convert segment task
This commit is contained in:
Nishant 2015-08-27 18:13:09 +05:30
commit 6e5900e354
4 changed files with 83 additions and 6 deletions

View File

@ -0,0 +1,49 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.segment.IndexSpec;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@Deprecated
public class ConvertSegmentBackwardsCompatibleTask extends ConvertSegmentTask
{
@JsonCreator
public ConvertSegmentBackwardsCompatibleTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") Boolean force,
@JsonProperty("validate") Boolean validate
)
{
super(
id == null ? ConvertSegmentTask.makeId(dataSource, interval) : id,
dataSource,
interval,
segment,
indexSpec,
force == null ? false : force,
validate ==null ? false : validate
);
}
@Deprecated
public static class SubTask extends ConvertSegmentTask.SubTask
{
@JsonCreator
public SubTask(
@JsonProperty("groupId") String groupId,
@JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("force") Boolean force,
@JsonProperty("validate") Boolean validate
)
{
super(groupId, segment, indexSpec, force, validate);
}
}
}

View File

@ -106,7 +106,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
return new ConvertSegmentTask(id, dataSource, interval, segment, indexSpec, force, validate);
}
private static String makeId(String dataSource, Interval interval)
protected static String makeId(String dataSource, Interval interval)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(interval, "interval");
@ -248,7 +248,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
@Override
public Task apply(DataSegment input)
{
return new SubTask(groupId, segment, indexSpec, force, validate);
return new SubTask(groupId, input, indexSpec, force, validate);
}
}
);

View File

@ -51,8 +51,8 @@ import io.druid.query.QueryRunner;
@JsonSubTypes.Type(name = "hadoop_convert_segment_sub", value = HadoopConverterTask.ConverterSubTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentTask.class), // Backwards compat - Deprecated
@JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentTask.SubTask.class), // backwards compat - Deprecated
@JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentBackwardsCompatibleTask.class), // Backwards compat - Deprecated
@JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - Deprecated
@JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class),
@JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class)
})

View File

@ -17,11 +17,13 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.core.JsonParseException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import java.io.IOException;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -31,16 +33,16 @@ import org.junit.Test;
*/
public class ConvertSegmentTaskTest
{
private DefaultObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testSerializationSimple() throws Exception
{
final String dataSource = "billy";
final Interval interval = new Interval(new DateTime().minus(1000), new DateTime());
DefaultObjectMapper jsonMapper = new DefaultObjectMapper();
ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, null, false, true);
Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
Assert.assertEquals(task, task2);
@ -61,4 +63,30 @@ public class ConvertSegmentTaskTest
task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
Assert.assertEquals(task, task2);
}
@Test
public void testdeSerializationFromJsonString() throws Exception
{
String json = "{\n"
+ " \"type\" : \"convert_segment\",\n"
+ " \"dataSource\" : \"billy\",\n"
+ " \"interval\" : \"2015-08-27T00:00:00.000Z/2015-08-28T00:00:00.000Z\"\n"
+ "}";
ConvertSegmentTask task = (ConvertSegmentTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("billy", task.getDataSource());
Assert.assertEquals(new Interval("2015-08-27T00:00:00.000Z/2015-08-28T00:00:00.000Z"), task.getInterval());
}
@Test
public void testSerdeBackwardsCompatible() throws Exception
{
String json = "{\n"
+ " \"type\" : \"version_converter\",\n"
+ " \"dataSource\" : \"billy\",\n"
+ " \"interval\" : \"2015-08-27T00:00:00.000Z/2015-08-28T00:00:00.000Z\"\n"
+ "}";
ConvertSegmentTask task = (ConvertSegmentTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("billy", task.getDataSource());
Assert.assertEquals(new Interval("2015-08-27T00:00:00.000Z/2015-08-28T00:00:00.000Z"), task.getInterval());
}
}