diff --git a/client/pom.xml b/client/pom.xml
index 809d02343b9..b232092d027 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index 340f8a6d3a2..3111998cb7a 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/common/src/main/java/com/metamx/druid/config/ConfigManager.java b/common/src/main/java/com/metamx/druid/config/ConfigManager.java
index 1ecfd24482c..4aa97e7ffc7 100644
--- a/common/src/main/java/com/metamx/druid/config/ConfigManager.java
+++ b/common/src/main/java/com/metamx/druid/config/ConfigManager.java
@@ -38,6 +38,7 @@ public class ConfigManager
private final ScheduledExecutorService exec;
private final ConcurrentMap watchedConfigs;
private final String selectStatement;
+ private final String insertStatement;
private volatile ConfigManager.PollingCallable poller;
@@ -49,6 +50,10 @@ public class ConfigManager
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
this.watchedConfigs = Maps.newConcurrentMap();
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", config.getConfigTable());
+ insertStatement = String.format(
+ "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
+ config.getConfigTable()
+ );
}
@LifecycleStart
@@ -192,9 +197,7 @@ public class ConfigManager
@Override
public Void withHandle(Handle handle) throws Exception
{
- handle.createStatement(
- "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload"
- )
+ handle.createStatement(insertStatement)
.bind("name", key)
.bind("payload", newBytes)
.execute();
diff --git a/druid-services/pom.xml b/druid-services/pom.xml
index d1aab696a14..58487a127a4 100644
--- a/druid-services/pom.xml
+++ b/druid-services/pom.xml
@@ -24,11 +24,11 @@
druid-services
druid-services
druid-services
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
com.metamx
druid
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/examples/pom.xml b/examples/pom.xml
index 88151a158f5..e3a508b7b1e 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml
index 54d30ddd5a8..63439d5f1c9 100644
--- a/examples/rand/pom.xml
+++ b/examples/rand/pom.xml
@@ -9,7 +9,7 @@
com.metamx
druid-examples
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml
index 3b7c2d3582d..6436a52a21f 100644
--- a/examples/twitter/pom.xml
+++ b/examples/twitter/pom.xml
@@ -9,7 +9,7 @@
com.metamx
druid-examples
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/index-common/pom.xml b/index-common/pom.xml
index d5db4e3be63..d36ea5de375 100644
--- a/index-common/pom.xml
+++ b/index-common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/indexer/pom.xml b/indexer/pom.xml
index b1d065b9977..17573c6b6f4 100644
--- a/indexer/pom.xml
+++ b/indexer/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/merger/pom.xml b/merger/pom.xml
index fab94d85089..a9fe0a84f31 100644
--- a/merger/pom.xml
+++ b/merger/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.3.19-SNAPSHOT
+ 0.3.22-SNAPSHOT
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java
index ca9fe9c5c73..644d15c09f3 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java
@@ -3,6 +3,7 @@ package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
+import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.RetryPolicyFactory;
@@ -106,6 +107,10 @@ public class RemoteTaskActionClient implements TaskActionClient
final int port;
final String path = "/mmx/merger/v1/action";
+ if (instance == null) {
+ throw new ISE("Cannot find instance of indexer to talk to!");
+ }
+
host = instance.getAddress();
if (instance.getSslPort() != null && instance.getSslPort() > 0) {
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java
index eeec7d3651d..df5bf573fbc 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java
@@ -19,6 +19,7 @@
package com.metamx.druid.merger.common.task;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
@@ -35,9 +36,16 @@ public abstract class AbstractTask implements Task
{
private static final Joiner ID_JOINER = Joiner.on("_");
+ @JsonIgnore
private final String id;
+
+ @JsonIgnore
private final String groupId;
+
+ @JsonIgnore
private final String dataSource;
+
+ @JsonIgnore
private final Optional interval;
protected AbstractTask(String id, String dataSource, Interval interval)
@@ -119,4 +127,32 @@ public abstract class AbstractTask implements Task
{
return TaskStatus.success(getId());
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AbstractTask that = (AbstractTask) o;
+
+ if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) {
+ return false;
+ }
+ if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) {
+ return false;
+ }
+ if (id != null ? !id.equals(that.id) : that.id != null) {
+ return false;
+ }
+ if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
+ return false;
+ }
+
+ return true;
+ }
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java
index 5d15269677a..b00c1c24399 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java
@@ -48,11 +48,12 @@ public class AppendTask extends MergeTaskBase
{
@JsonCreator
public AppendTask(
+ @JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List segments
)
{
- super(dataSource, segments);
+ super(id, dataSource, segments);
}
@Override
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
index 86fd2a7ec37..5d704b26b3f 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
@@ -50,12 +50,13 @@ public class DeleteTask extends AbstractTask
@JsonCreator
public DeleteTask(
+ @JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
- String.format(
+ id != null ? id : String.format(
"delete_%s_%s_%s_%s",
dataSource,
interval.getStart(),
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
index 6e284557529..f3ce30c90cb 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
@@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
@@ -41,7 +42,7 @@ import java.util.List;
public class HadoopIndexTask extends AbstractTask
{
- @JsonProperty
+ @JsonIgnore
private final HadoopDruidIndexerConfig config;
private static final Logger log = new Logger(HadoopIndexTask.class);
@@ -58,11 +59,12 @@ public class HadoopIndexTask extends AbstractTask
@JsonCreator
public HadoopIndexTask(
+ @JsonProperty("id") String id,
@JsonProperty("config") HadoopDruidIndexerConfig config
)
{
super(
- String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
+ id != null ? id : String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
config.getDataSource(),
JodaUtils.umbrellaInterval(config.getIntervals())
);
@@ -133,4 +135,10 @@ public class HadoopIndexTask extends AbstractTask
}
}
+
+ @JsonProperty
+ public HadoopDruidIndexerConfig getConfig()
+ {
+ return config;
+ }
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
index 47f72b12501..675b1675072 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
@@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@@ -48,22 +49,23 @@ import java.util.Set;
public class IndexDeterminePartitionsTask extends AbstractTask
{
- @JsonProperty
+ @JsonIgnore
private final FirehoseFactory firehoseFactory;
- @JsonProperty
+ @JsonIgnore
private final Schema schema;
- @JsonProperty
+ @JsonIgnore
private final long targetPartitionSize;
- @JsonProperty
+ @JsonIgnore
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
public IndexDeterminePartitionsTask(
+ @JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@@ -73,7 +75,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
)
{
super(
- String.format(
+ id != null ? id : String.format(
"%s_partitions_%s_%s",
groupId,
interval.getStart(),
@@ -243,6 +245,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
public Task apply(ShardSpec shardSpec)
{
return new IndexGeneratorTask(
+ null,
getGroupId(),
getImplicitLockInterval().get(),
firehoseFactory,
@@ -262,4 +265,28 @@ public class IndexDeterminePartitionsTask extends AbstractTask
return TaskStatus.success(getId());
}
+
+ @JsonProperty
+ public FirehoseFactory getFirehoseFactory()
+ {
+ return firehoseFactory;
+ }
+
+ @JsonProperty
+ public Schema getSchema()
+ {
+ return schema;
+ }
+
+ @JsonProperty
+ public long getTargetPartitionSize()
+ {
+ return targetPartitionSize;
+ }
+
+ @JsonProperty
+ public int getRowFlushBoundary()
+ {
+ return rowFlushBoundary;
+ }
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
index dd928883232..6eb58ea91c6 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
@@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
@@ -52,19 +53,20 @@ import java.util.concurrent.CopyOnWriteArrayList;
public class IndexGeneratorTask extends AbstractTask
{
- @JsonProperty
+ @JsonIgnore
private final FirehoseFactory firehoseFactory;
- @JsonProperty
+ @JsonIgnore
private final Schema schema;
- @JsonProperty
+ @JsonIgnore
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
public IndexGeneratorTask(
+ @JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@@ -73,7 +75,7 @@ public class IndexGeneratorTask extends AbstractTask
)
{
super(
- String.format(
+ id != null ? id : String.format(
"%s_generator_%s_%s_%s",
groupId,
interval.getStart(),
@@ -216,4 +218,22 @@ public class IndexGeneratorTask extends AbstractTask
return schema.getShardSpec().isInChunk(eventDimensions);
}
+
+ @JsonProperty
+ public FirehoseFactory getFirehoseFactory()
+ {
+ return firehoseFactory;
+ }
+
+ @JsonProperty
+ public Schema getSchema()
+ {
+ return schema;
+ }
+
+ @JsonProperty
+ public int getRowFlushBoundary()
+ {
+ return rowFlushBoundary;
+ }
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
index 35babcd6a22..a86c57d94f5 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
@@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -40,28 +41,29 @@ import java.util.List;
public class IndexTask extends AbstractTask
{
- @JsonProperty
+ @JsonIgnore
private final GranularitySpec granularitySpec;
- @JsonProperty
+ @JsonIgnore
private final AggregatorFactory[] aggregators;
- @JsonProperty
+ @JsonIgnore
private final QueryGranularity indexGranularity;
- @JsonProperty
+ @JsonIgnore
private final long targetPartitionSize;
- @JsonProperty
+ @JsonIgnore
private final FirehoseFactory firehoseFactory;
- @JsonProperty
+ @JsonIgnore
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
public IndexTask(
+ @JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@@ -73,7 +75,7 @@ public class IndexTask extends AbstractTask
{
super(
// _not_ the version, just something uniqueish
- String.format("index_%s_%s", dataSource, new DateTime().toString()),
+ id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()),
dataSource,
new Interval(
granularitySpec.bucketIntervals().first().getStart(),
@@ -98,6 +100,7 @@ public class IndexTask extends AbstractTask
// Need to do one pass over the data before indexing in order to determine good partitions
retVal.add(
new IndexDeterminePartitionsTask(
+ null,
getGroupId(),
interval,
firehoseFactory,
@@ -115,6 +118,7 @@ public class IndexTask extends AbstractTask
// Jump straight into indexing
retVal.add(
new IndexGeneratorTask(
+ null,
getGroupId(),
interval,
firehoseFactory,
@@ -151,4 +155,41 @@ public class IndexTask extends AbstractTask
{
throw new IllegalStateException("IndexTasks should not be run!");
}
+
+ @JsonProperty
+ public GranularitySpec getGranularitySpec()
+ {
+ return granularitySpec;
+ }
+
+ @JsonProperty
+ public AggregatorFactory[] getAggregators()
+ {
+ return aggregators;
+ }
+
+ @JsonProperty
+ public QueryGranularity getIndexGranularity()
+ {
+ return indexGranularity;
+ }
+
+ @JsonProperty
+ public long getTargetPartitionSize()
+ {
+ return targetPartitionSize;
+ }
+
+ @JsonProperty
+ public FirehoseFactory getFirehoseFactory()
+ {
+ return firehoseFactory;
+ }
+
+ @JsonProperty
+ public int getRowFlushBoundary()
+ {
+ return rowFlushBoundary;
+ }
+
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
index f4476ffd858..e26a25fd038 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
@@ -45,12 +45,13 @@ public class KillTask extends AbstractTask
@JsonCreator
public KillTask(
+ @JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
- String.format(
+ id != null ? id : String.format(
"kill_%s_%s_%s_%s",
dataSource,
interval.getStart(),
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java
index 4e6102f666b..9867eec0c4c 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java
@@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
@@ -42,16 +43,18 @@ import java.util.Map;
*/
public class MergeTask extends MergeTaskBase
{
+ @JsonIgnore
private final List aggregators;
@JsonCreator
public MergeTask(
+ @JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List segments,
@JsonProperty("aggregations") List aggregators
)
{
- super(dataSource, segments);
+ super(id, dataSource, segments);
this.aggregators = aggregators;
}
@@ -86,4 +89,10 @@ public class MergeTask extends MergeTaskBase
{
return "merge";
}
+
+ @JsonProperty("aggregations")
+ public List getAggregators()
+ {
+ return aggregators;
+ }
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
index de4989436a1..63ee09726e9 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
@@ -19,6 +19,7 @@
package com.metamx.druid.merger.common.task;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
@@ -58,15 +59,18 @@ import java.util.Set;
*/
public abstract class MergeTaskBase extends AbstractTask
{
+ @JsonIgnore
private final List segments;
private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
- protected MergeTaskBase(final String dataSource, final List segments)
+ protected MergeTaskBase(final String id, final String dataSource, final List segments)
{
super(
// _not_ the version, just something uniqueish
- String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()),
+ id != null ? id : String.format(
+ "merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()
+ ),
dataSource,
computeMergedInterval(segments)
);
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
index cebebd218cd..c5db8aba959 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
@@ -19,6 +19,8 @@
package com.metamx.druid.merger.common.task;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
@@ -51,16 +53,41 @@ public class VersionConverterTask extends AbstractTask
private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID);
private static final Logger log = new Logger(VersionConverterTask.class);
+
+ @JsonIgnore
private final DataSegment segment;
- public VersionConverterTask(
+ public static VersionConverterTask create(String dataSource, Interval interval)
+ {
+ final String id = makeId(dataSource, interval);
+ return new VersionConverterTask(id, id, dataSource, interval, null);
+ }
+
+ public static VersionConverterTask create(DataSegment segment)
+ {
+ final Interval interval = segment.getInterval();
+ final String dataSource = segment.getDataSource();
+ final String id = makeId(dataSource, interval);
+ return new VersionConverterTask(id, id, dataSource, interval, segment);
+ }
+
+ private static String makeId(String dataSource, Interval interval)
+ {
+ return joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime());
+ }
+
+ @JsonCreator
+ private VersionConverterTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("segment") DataSegment segment
)
{
super(
- joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime()),
+ id,
+ groupId,
dataSource,
interval
);
@@ -74,6 +101,12 @@ public class VersionConverterTask extends AbstractTask
return TYPE;
}
+ @JsonProperty
+ public DataSegment getSegment()
+ {
+ return segment;
+ }
+
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
@@ -121,11 +154,32 @@ public class VersionConverterTask extends AbstractTask
return TaskStatus.success(getId());
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ VersionConverterTask that = (VersionConverterTask) o;
+
+ if (segment != null ? !segment.equals(that.segment) : that.segment != null) {
+ return false;
+ }
+
+ return super.equals(o);
+ }
+
public static class SubTask extends AbstractTask
{
+ @JsonIgnore
private final DataSegment segment;
- protected SubTask(
+ @JsonCreator
+ public SubTask(
@JsonProperty("groupId") String groupId,
@JsonProperty("segment") DataSegment segment
)
@@ -145,6 +199,12 @@ public class VersionConverterTask extends AbstractTask
this.segment = segment;
}
+ @JsonProperty
+ public DataSegment getSegment()
+ {
+ return segment;
+ }
+
@Override
public String getType()
{
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java
index b878885dd4a..98d16d671e9 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java
@@ -36,6 +36,7 @@ import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
+import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
@@ -52,7 +53,7 @@ public class DbTaskStorage implements TaskStorage
private final IndexerDbConnectorConfig dbConnectorConfig;
private final DBI dbi;
- private static final Logger log = new Logger(DbTaskStorage.class);
+ private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, DBI dbi)
{
@@ -203,18 +204,18 @@ public class DbTaskStorage implements TaskStorage
}
@Override
- public List getRunningTasks()
+ public List getRunningTaskIds()
{
return dbi.withHandle(
- new HandleCallback>()
+ new HandleCallback>()
{
@Override
- public List withHandle(Handle handle) throws Exception
+ public List withHandle(Handle handle) throws Exception
{
final List