diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java index 518fb04ab37..502c9838de2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java @@ -111,4 +111,32 @@ public abstract class AbstractTask implements Task { return TaskStatus.success(getId()); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AbstractTask that = (AbstractTask) o; + + if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) { + return false; + } + if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) { + return false; + } + if (id != null ? !id.equals(that.id) : that.id != null) { + return false; + } + if (interval != null ? !interval.equals(that.interval) : that.interval != null) { + return false; + } + + return true; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java index cebebd218cd..db5234dce5d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.common.task; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -53,14 +54,37 @@ public class VersionConverterTask extends AbstractTask private static final Logger log = new Logger(VersionConverterTask.class); private final DataSegment segment; - public VersionConverterTask( + public static VersionConverterTask create(String dataSource, Interval interval) + { + final String id = makeId(dataSource, interval); + return new VersionConverterTask(id, id, dataSource, interval, null); + } + + public static VersionConverterTask create(DataSegment segment) + { + final Interval interval = segment.getInterval(); + final String dataSource = segment.getDataSource(); + final String id = makeId(dataSource, interval); + return new VersionConverterTask(id, id, dataSource, interval, segment); + } + + private static String makeId(String dataSource, Interval interval) + { + return joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime()); + } + + @JsonCreator + private VersionConverterTask( + @JsonProperty("id") String id, + @JsonProperty("groupId") String groupId, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("segment") DataSegment segment ) { super( - joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime()), + id, + groupId, dataSource, interval ); @@ -74,6 +98,12 @@ public class VersionConverterTask extends AbstractTask return TYPE; } + @JsonProperty + public DataSegment getSegment() + { + return segment; + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { @@ -121,11 +151,31 @@ public class VersionConverterTask extends AbstractTask return TaskStatus.success(getId()); } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + VersionConverterTask that = (VersionConverterTask) o; + + if (segment != null ? !segment.equals(that.segment) : that.segment != null) { + return false; + } + + return super.equals(o); + } + public static class SubTask extends AbstractTask { private final DataSegment segment; - protected SubTask( + @JsonCreator + public SubTask( @JsonProperty("groupId") String groupId, @JsonProperty("segment") DataSegment segment ) @@ -145,6 +195,12 @@ public class VersionConverterTask extends AbstractTask this.segment = segment; } + @JsonProperty + public DataSegment getSegment() + { + return segment; + } + @Override public String getType() { diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/VersionConverterTaskTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/VersionConverterTaskTest.java new file mode 100644 index 00000000000..8beeae6d411 --- /dev/null +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/VersionConverterTaskTest.java @@ -0,0 +1,66 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common.task; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.shard.NoneShardSpec; +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Test; + +/** + */ +public class VersionConverterTaskTest +{ + @Test + public void testSerializationSimple() throws Exception + { + final String dataSource = "billy"; + final Interval interval = new Interval(new DateTime().minus(1000), new DateTime()); + + DefaultObjectMapper jsonMapper = new DefaultObjectMapper(); + + VersionConverterTask task = VersionConverterTask.create(dataSource, interval); + + Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); + Assert.assertEquals(task, task2); + + DataSegment segment = new DataSegment( + dataSource, + interval, + new DateTime().toString(), + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NoneShardSpec(), + 9, + 102937 + ); + + task = VersionConverterTask.create(segment); + + task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); + Assert.assertEquals(task, task2); + } +}