mirror of https://github.com/apache/druid.git
Merger: Task serde without relying on jackson private-final-setter magic
This commit is contained in:
parent
4c165b4880
commit
cf470b1ed4
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package com.metamx.druid.merger.common.task;
|
package com.metamx.druid.merger.common.task;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Objects;
|
import com.google.common.base.Objects;
|
||||||
|
@ -33,9 +34,16 @@ public abstract class AbstractTask implements Task
|
||||||
{
|
{
|
||||||
private static final Joiner ID_JOINER = Joiner.on("_");
|
private static final Joiner ID_JOINER = Joiner.on("_");
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
private final String id;
|
private final String id;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
private final String groupId;
|
private final String groupId;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
private final String dataSource;
|
private final String dataSource;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
private final Optional<Interval> interval;
|
private final Optional<Interval> interval;
|
||||||
|
|
||||||
protected AbstractTask(String id, String dataSource, Interval interval)
|
protected AbstractTask(String id, String dataSource, Interval interval)
|
||||||
|
|
|
@ -48,11 +48,12 @@ public class AppendTask extends MergeTaskBase
|
||||||
{
|
{
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public AppendTask(
|
public AppendTask(
|
||||||
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("dataSource") String dataSource,
|
@JsonProperty("dataSource") String dataSource,
|
||||||
@JsonProperty("segments") List<DataSegment> segments
|
@JsonProperty("segments") List<DataSegment> segments
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, segments);
|
super(id, dataSource, segments);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -50,12 +50,13 @@ public class DeleteTask extends AbstractTask
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public DeleteTask(
|
public DeleteTask(
|
||||||
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("dataSource") String dataSource,
|
@JsonProperty("dataSource") String dataSource,
|
||||||
@JsonProperty("interval") Interval interval
|
@JsonProperty("interval") Interval interval
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
String.format(
|
id != null ? id : String.format(
|
||||||
"delete_%s_%s_%s_%s",
|
"delete_%s_%s_%s_%s",
|
||||||
dataSource,
|
dataSource,
|
||||||
interval.getStart(),
|
interval.getStart(),
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package com.metamx.druid.merger.common.task;
|
package com.metamx.druid.merger.common.task;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
@ -41,7 +42,7 @@ import java.util.List;
|
||||||
|
|
||||||
public class HadoopIndexTask extends AbstractTask
|
public class HadoopIndexTask extends AbstractTask
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final HadoopDruidIndexerConfig config;
|
private final HadoopDruidIndexerConfig config;
|
||||||
|
|
||||||
private static final Logger log = new Logger(HadoopIndexTask.class);
|
private static final Logger log = new Logger(HadoopIndexTask.class);
|
||||||
|
@ -58,11 +59,12 @@ public class HadoopIndexTask extends AbstractTask
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public HadoopIndexTask(
|
public HadoopIndexTask(
|
||||||
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("config") HadoopDruidIndexerConfig config
|
@JsonProperty("config") HadoopDruidIndexerConfig config
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
|
id != null ? id : String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
|
||||||
config.getDataSource(),
|
config.getDataSource(),
|
||||||
JodaUtils.umbrellaInterval(config.getIntervals())
|
JodaUtils.umbrellaInterval(config.getIntervals())
|
||||||
);
|
);
|
||||||
|
@ -133,4 +135,10 @@ public class HadoopIndexTask extends AbstractTask
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public HadoopDruidIndexerConfig getConfig()
|
||||||
|
{
|
||||||
|
return config;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package com.metamx.druid.merger.common.task;
|
package com.metamx.druid.merger.common.task;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -48,22 +49,23 @@ import java.util.Set;
|
||||||
|
|
||||||
public class IndexDeterminePartitionsTask extends AbstractTask
|
public class IndexDeterminePartitionsTask extends AbstractTask
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final FirehoseFactory firehoseFactory;
|
private final FirehoseFactory firehoseFactory;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final Schema schema;
|
private final Schema schema;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final long targetPartitionSize;
|
private final long targetPartitionSize;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final int rowFlushBoundary;
|
private final int rowFlushBoundary;
|
||||||
|
|
||||||
private static final Logger log = new Logger(IndexTask.class);
|
private static final Logger log = new Logger(IndexTask.class);
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public IndexDeterminePartitionsTask(
|
public IndexDeterminePartitionsTask(
|
||||||
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("groupId") String groupId,
|
@JsonProperty("groupId") String groupId,
|
||||||
@JsonProperty("interval") Interval interval,
|
@JsonProperty("interval") Interval interval,
|
||||||
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
|
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
|
||||||
|
@ -73,7 +75,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
String.format(
|
id != null ? id : String.format(
|
||||||
"%s_partitions_%s_%s",
|
"%s_partitions_%s_%s",
|
||||||
groupId,
|
groupId,
|
||||||
interval.getStart(),
|
interval.getStart(),
|
||||||
|
@ -243,6 +245,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
|
||||||
public Task apply(ShardSpec shardSpec)
|
public Task apply(ShardSpec shardSpec)
|
||||||
{
|
{
|
||||||
return new IndexGeneratorTask(
|
return new IndexGeneratorTask(
|
||||||
|
null,
|
||||||
getGroupId(),
|
getGroupId(),
|
||||||
getImplicitLockInterval().get(),
|
getImplicitLockInterval().get(),
|
||||||
firehoseFactory,
|
firehoseFactory,
|
||||||
|
@ -262,4 +265,28 @@ public class IndexDeterminePartitionsTask extends AbstractTask
|
||||||
|
|
||||||
return TaskStatus.success(getId());
|
return TaskStatus.success(getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public FirehoseFactory getFirehoseFactory()
|
||||||
|
{
|
||||||
|
return firehoseFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public Schema getSchema()
|
||||||
|
{
|
||||||
|
return schema;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getTargetPartitionSize()
|
||||||
|
{
|
||||||
|
return targetPartitionSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getRowFlushBoundary()
|
||||||
|
{
|
||||||
|
return rowFlushBoundary;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package com.metamx.druid.merger.common.task;
|
package com.metamx.druid.merger.common.task;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
@ -52,19 +53,20 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
public class IndexGeneratorTask extends AbstractTask
|
public class IndexGeneratorTask extends AbstractTask
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final FirehoseFactory firehoseFactory;
|
private final FirehoseFactory firehoseFactory;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final Schema schema;
|
private final Schema schema;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final int rowFlushBoundary;
|
private final int rowFlushBoundary;
|
||||||
|
|
||||||
private static final Logger log = new Logger(IndexTask.class);
|
private static final Logger log = new Logger(IndexTask.class);
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public IndexGeneratorTask(
|
public IndexGeneratorTask(
|
||||||
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("groupId") String groupId,
|
@JsonProperty("groupId") String groupId,
|
||||||
@JsonProperty("interval") Interval interval,
|
@JsonProperty("interval") Interval interval,
|
||||||
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
|
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
|
||||||
|
@ -73,7 +75,7 @@ public class IndexGeneratorTask extends AbstractTask
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
String.format(
|
id != null ? id : String.format(
|
||||||
"%s_generator_%s_%s_%s",
|
"%s_generator_%s_%s_%s",
|
||||||
groupId,
|
groupId,
|
||||||
interval.getStart(),
|
interval.getStart(),
|
||||||
|
@ -216,4 +218,22 @@ public class IndexGeneratorTask extends AbstractTask
|
||||||
|
|
||||||
return schema.getShardSpec().isInChunk(eventDimensions);
|
return schema.getShardSpec().isInChunk(eventDimensions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public FirehoseFactory getFirehoseFactory()
|
||||||
|
{
|
||||||
|
return firehoseFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public Schema getSchema()
|
||||||
|
{
|
||||||
|
return schema;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getRowFlushBoundary()
|
||||||
|
{
|
||||||
|
return rowFlushBoundary;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package com.metamx.druid.merger.common.task;
|
package com.metamx.druid.merger.common.task;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -40,28 +41,29 @@ import java.util.List;
|
||||||
|
|
||||||
public class IndexTask extends AbstractTask
|
public class IndexTask extends AbstractTask
|
||||||
{
|
{
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final GranularitySpec granularitySpec;
|
private final GranularitySpec granularitySpec;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final AggregatorFactory[] aggregators;
|
private final AggregatorFactory[] aggregators;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final QueryGranularity indexGranularity;
|
private final QueryGranularity indexGranularity;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final long targetPartitionSize;
|
private final long targetPartitionSize;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final FirehoseFactory firehoseFactory;
|
private final FirehoseFactory firehoseFactory;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonIgnore
|
||||||
private final int rowFlushBoundary;
|
private final int rowFlushBoundary;
|
||||||
|
|
||||||
private static final Logger log = new Logger(IndexTask.class);
|
private static final Logger log = new Logger(IndexTask.class);
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public IndexTask(
|
public IndexTask(
|
||||||
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("dataSource") String dataSource,
|
@JsonProperty("dataSource") String dataSource,
|
||||||
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
|
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
|
||||||
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
|
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
|
||||||
|
@ -73,7 +75,7 @@ public class IndexTask extends AbstractTask
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
// _not_ the version, just something uniqueish
|
// _not_ the version, just something uniqueish
|
||||||
String.format("index_%s_%s", dataSource, new DateTime().toString()),
|
id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()),
|
||||||
dataSource,
|
dataSource,
|
||||||
new Interval(
|
new Interval(
|
||||||
granularitySpec.bucketIntervals().first().getStart(),
|
granularitySpec.bucketIntervals().first().getStart(),
|
||||||
|
@ -98,6 +100,7 @@ public class IndexTask extends AbstractTask
|
||||||
// Need to do one pass over the data before indexing in order to determine good partitions
|
// Need to do one pass over the data before indexing in order to determine good partitions
|
||||||
retVal.add(
|
retVal.add(
|
||||||
new IndexDeterminePartitionsTask(
|
new IndexDeterminePartitionsTask(
|
||||||
|
null,
|
||||||
getGroupId(),
|
getGroupId(),
|
||||||
interval,
|
interval,
|
||||||
firehoseFactory,
|
firehoseFactory,
|
||||||
|
@ -115,6 +118,7 @@ public class IndexTask extends AbstractTask
|
||||||
// Jump straight into indexing
|
// Jump straight into indexing
|
||||||
retVal.add(
|
retVal.add(
|
||||||
new IndexGeneratorTask(
|
new IndexGeneratorTask(
|
||||||
|
null,
|
||||||
getGroupId(),
|
getGroupId(),
|
||||||
interval,
|
interval,
|
||||||
firehoseFactory,
|
firehoseFactory,
|
||||||
|
@ -151,4 +155,41 @@ public class IndexTask extends AbstractTask
|
||||||
{
|
{
|
||||||
throw new IllegalStateException("IndexTasks should not be run!");
|
throw new IllegalStateException("IndexTasks should not be run!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public GranularitySpec getGranularitySpec()
|
||||||
|
{
|
||||||
|
return granularitySpec;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public AggregatorFactory[] getAggregators()
|
||||||
|
{
|
||||||
|
return aggregators;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public QueryGranularity getIndexGranularity()
|
||||||
|
{
|
||||||
|
return indexGranularity;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public long getTargetPartitionSize()
|
||||||
|
{
|
||||||
|
return targetPartitionSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public FirehoseFactory getFirehoseFactory()
|
||||||
|
{
|
||||||
|
return firehoseFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getRowFlushBoundary()
|
||||||
|
{
|
||||||
|
return rowFlushBoundary;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,12 +45,13 @@ public class KillTask extends AbstractTask
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public KillTask(
|
public KillTask(
|
||||||
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("dataSource") String dataSource,
|
@JsonProperty("dataSource") String dataSource,
|
||||||
@JsonProperty("interval") Interval interval
|
@JsonProperty("interval") Interval interval
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
String.format(
|
id != null ? id : String.format(
|
||||||
"kill_%s_%s_%s_%s",
|
"kill_%s_%s_%s_%s",
|
||||||
dataSource,
|
dataSource,
|
||||||
interval.getStart(),
|
interval.getStart(),
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package com.metamx.druid.merger.common.task;
|
package com.metamx.druid.merger.common.task;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
|
@ -42,16 +43,18 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public class MergeTask extends MergeTaskBase
|
public class MergeTask extends MergeTaskBase
|
||||||
{
|
{
|
||||||
|
@JsonIgnore
|
||||||
private final List<AggregatorFactory> aggregators;
|
private final List<AggregatorFactory> aggregators;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public MergeTask(
|
public MergeTask(
|
||||||
|
@JsonProperty("id") String id,
|
||||||
@JsonProperty("dataSource") String dataSource,
|
@JsonProperty("dataSource") String dataSource,
|
||||||
@JsonProperty("segments") List<DataSegment> segments,
|
@JsonProperty("segments") List<DataSegment> segments,
|
||||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
|
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, segments);
|
super(id, dataSource, segments);
|
||||||
this.aggregators = aggregators;
|
this.aggregators = aggregators;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,4 +89,10 @@ public class MergeTask extends MergeTaskBase
|
||||||
{
|
{
|
||||||
return "merge";
|
return "merge";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty("aggregations")
|
||||||
|
public List<AggregatorFactory> getAggregators()
|
||||||
|
{
|
||||||
|
return aggregators;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package com.metamx.druid.merger.common.task;
|
package com.metamx.druid.merger.common.task;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
|
@ -56,15 +57,18 @@ import java.util.Set;
|
||||||
*/
|
*/
|
||||||
public abstract class MergeTaskBase extends AbstractTask
|
public abstract class MergeTaskBase extends AbstractTask
|
||||||
{
|
{
|
||||||
|
@JsonIgnore
|
||||||
private final List<DataSegment> segments;
|
private final List<DataSegment> segments;
|
||||||
|
|
||||||
private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
|
private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
|
||||||
|
|
||||||
protected MergeTaskBase(final String dataSource, final List<DataSegment> segments)
|
protected MergeTaskBase(final String id, final String dataSource, final List<DataSegment> segments)
|
||||||
{
|
{
|
||||||
super(
|
super(
|
||||||
// _not_ the version, just something uniqueish
|
// _not_ the version, just something uniqueish
|
||||||
String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()),
|
id != null ? id : String.format(
|
||||||
|
"merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()
|
||||||
|
),
|
||||||
dataSource,
|
dataSource,
|
||||||
computeMergedInterval(segments)
|
computeMergedInterval(segments)
|
||||||
);
|
);
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package com.metamx.druid.merger.common.task;
|
package com.metamx.druid.merger.common.task;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -52,6 +53,8 @@ public class VersionConverterTask extends AbstractTask
|
||||||
private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID);
|
private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID);
|
||||||
|
|
||||||
private static final Logger log = new Logger(VersionConverterTask.class);
|
private static final Logger log = new Logger(VersionConverterTask.class);
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
private final DataSegment segment;
|
private final DataSegment segment;
|
||||||
|
|
||||||
public static VersionConverterTask create(String dataSource, Interval interval)
|
public static VersionConverterTask create(String dataSource, Interval interval)
|
||||||
|
@ -172,6 +175,7 @@ public class VersionConverterTask extends AbstractTask
|
||||||
|
|
||||||
public static class SubTask extends AbstractTask
|
public static class SubTask extends AbstractTask
|
||||||
{
|
{
|
||||||
|
@JsonIgnore
|
||||||
private final DataSegment segment;
|
private final DataSegment segment;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.util.List;
|
||||||
@JsonTypeName("test")
|
@JsonTypeName("test")
|
||||||
public class TestTask extends MergeTask
|
public class TestTask extends MergeTask
|
||||||
{
|
{
|
||||||
private final String id;
|
|
||||||
private final TaskStatus status;
|
private final TaskStatus status;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
|
@ -47,19 +46,10 @@ public class TestTask extends MergeTask
|
||||||
@JsonProperty("taskStatus") TaskStatus status
|
@JsonProperty("taskStatus") TaskStatus status
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(dataSource, segments, aggregators);
|
super(id, dataSource, segments, aggregators);
|
||||||
|
|
||||||
this.id = id;
|
|
||||||
this.status = status;
|
this.status = status;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
@JsonProperty
|
|
||||||
public String getId()
|
|
||||||
{
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public String getType()
|
public String getType()
|
||||||
|
|
|
@ -43,7 +43,7 @@ public class MergeTaskBaseTest
|
||||||
.add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build())
|
.add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final MergeTaskBase testMergeTaskBase = new MergeTaskBase("foo", segments)
|
final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected File merge(Map<DataSegment, File> segments, File outDir) throws Exception
|
protected File merge(Map<DataSegment, File> segments, File outDir) throws Exception
|
||||||
|
|
|
@ -5,6 +5,7 @@ import com.google.common.collect.ImmutableList;
|
||||||
import com.metamx.common.Granularity;
|
import com.metamx.common.Granularity;
|
||||||
import com.metamx.druid.QueryGranularity;
|
import com.metamx.druid.QueryGranularity;
|
||||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
|
import com.metamx.druid.aggregation.CountAggregatorFactory;
|
||||||
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
|
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
|
import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
|
||||||
|
@ -26,6 +27,7 @@ public class TaskSerdeTest
|
||||||
public void testIndexTaskSerde() throws Exception
|
public void testIndexTaskSerde() throws Exception
|
||||||
{
|
{
|
||||||
final Task task = new IndexTask(
|
final Task task = new IndexTask(
|
||||||
|
null,
|
||||||
"foo",
|
"foo",
|
||||||
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
|
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
|
||||||
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
|
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
|
||||||
|
@ -54,6 +56,7 @@ public class TaskSerdeTest
|
||||||
public void testIndexGeneratorTaskSerde() throws Exception
|
public void testIndexGeneratorTaskSerde() throws Exception
|
||||||
{
|
{
|
||||||
final Task task = new IndexGeneratorTask(
|
final Task task = new IndexGeneratorTask(
|
||||||
|
null,
|
||||||
"foo",
|
"foo",
|
||||||
new Interval("2010-01-01/P1D"),
|
new Interval("2010-01-01/P1D"),
|
||||||
null,
|
null,
|
||||||
|
@ -68,6 +71,8 @@ public class TaskSerdeTest
|
||||||
|
|
||||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
final String json = jsonMapper.writeValueAsString(task);
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
|
||||||
|
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||||
final Task task2 = jsonMapper.readValue(json, Task.class);
|
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||||
|
|
||||||
Assert.assertEquals("foo", task.getDataSource());
|
Assert.assertEquals("foo", task.getDataSource());
|
||||||
|
@ -80,17 +85,23 @@ public class TaskSerdeTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppendTaskSerde() throws Exception
|
public void testMergeTaskSerde() throws Exception
|
||||||
{
|
{
|
||||||
final Task task = new AppendTask(
|
final Task task = new MergeTask(
|
||||||
|
null,
|
||||||
"foo",
|
"foo",
|
||||||
ImmutableList.<DataSegment>of(
|
ImmutableList.<DataSegment>of(
|
||||||
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
|
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
|
||||||
|
),
|
||||||
|
ImmutableList.<AggregatorFactory>of(
|
||||||
|
new CountAggregatorFactory("cnt")
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
final String json = jsonMapper.writeValueAsString(task);
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
|
||||||
|
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||||
final Task task2 = jsonMapper.readValue(json, Task.class);
|
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||||
|
|
||||||
Assert.assertEquals("foo", task.getDataSource());
|
Assert.assertEquals("foo", task.getDataSource());
|
||||||
|
@ -100,20 +111,131 @@ public class TaskSerdeTest
|
||||||
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||||
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||||
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
|
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
|
||||||
|
Assert.assertEquals(((MergeTask) task).getSegments(), ((MergeTask) task2).getSegments());
|
||||||
|
Assert.assertEquals(
|
||||||
|
((MergeTask) task).getAggregators().get(0).getName(),
|
||||||
|
((MergeTask) task2).getAggregators().get(0).getName()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteTaskSerde() throws Exception
|
public void testKillTaskSerde() throws Exception
|
||||||
{
|
{
|
||||||
final Task task = new DeleteTask(
|
final Task task = new KillTask(
|
||||||
|
null,
|
||||||
"foo",
|
"foo",
|
||||||
new Interval("2010-01-01/P1D")
|
new Interval("2010-01-01/P1D")
|
||||||
);
|
);
|
||||||
|
|
||||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
final String json = jsonMapper.writeValueAsString(task);
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
|
||||||
|
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||||
final Task task2 = jsonMapper.readValue(json, Task.class);
|
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||||
|
|
||||||
|
Assert.assertEquals("foo", task.getDataSource());
|
||||||
|
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
|
||||||
|
|
||||||
|
Assert.assertEquals(task.getId(), task2.getId());
|
||||||
|
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||||
|
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||||
|
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVersionConverterTaskSerde() throws Exception
|
||||||
|
{
|
||||||
|
final Task task = VersionConverterTask.create(
|
||||||
|
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
|
||||||
|
);
|
||||||
|
|
||||||
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
|
||||||
|
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||||
|
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||||
|
|
||||||
|
Assert.assertEquals("foo", task.getDataSource());
|
||||||
|
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
|
||||||
|
|
||||||
|
Assert.assertEquals(task.getId(), task2.getId());
|
||||||
|
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||||
|
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||||
|
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
|
||||||
|
Assert.assertEquals(((VersionConverterTask) task).getSegment(), ((VersionConverterTask) task).getSegment());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVersionConverterSubTaskSerde() throws Exception
|
||||||
|
{
|
||||||
|
final Task task = new VersionConverterTask.SubTask(
|
||||||
|
"myGroupId",
|
||||||
|
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
|
||||||
|
);
|
||||||
|
|
||||||
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
|
||||||
|
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||||
|
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||||
|
|
||||||
|
Assert.assertEquals("foo", task.getDataSource());
|
||||||
|
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
|
||||||
|
Assert.assertEquals("myGroupId", task.getGroupId());
|
||||||
|
|
||||||
|
Assert.assertEquals(task.getId(), task2.getId());
|
||||||
|
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||||
|
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||||
|
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
|
||||||
|
Assert.assertEquals(
|
||||||
|
((VersionConverterTask.SubTask) task).getSegment(),
|
||||||
|
((VersionConverterTask.SubTask) task).getSegment()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteTaskSerde() throws Exception
|
||||||
|
{
|
||||||
|
final Task task = new DeleteTask(
|
||||||
|
null,
|
||||||
|
"foo",
|
||||||
|
new Interval("2010-01-01/P1D")
|
||||||
|
);
|
||||||
|
|
||||||
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
|
||||||
|
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||||
|
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||||
|
|
||||||
|
Assert.assertEquals("foo", task.getDataSource());
|
||||||
|
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
|
||||||
|
|
||||||
|
Assert.assertEquals(task.getId(), task2.getId());
|
||||||
|
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||||
|
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||||
|
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
|
||||||
|
Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteTaskFromJson() throws Exception
|
||||||
|
{
|
||||||
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
final Task task = jsonMapper.readValue(
|
||||||
|
"{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}",
|
||||||
|
Task.class
|
||||||
|
);
|
||||||
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
|
||||||
|
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||||
|
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||||
|
|
||||||
|
Assert.assertNotNull(task.getId());
|
||||||
|
Assert.assertEquals("foo", task.getDataSource());
|
||||||
|
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
|
||||||
|
|
||||||
Assert.assertEquals(task.getId(), task2.getId());
|
Assert.assertEquals(task.getId(), task2.getId());
|
||||||
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||||
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||||
|
@ -121,10 +243,39 @@ public class TaskSerdeTest
|
||||||
Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get());
|
Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppendTaskSerde() throws Exception
|
||||||
|
{
|
||||||
|
final Task task = new AppendTask(
|
||||||
|
null,
|
||||||
|
"foo",
|
||||||
|
ImmutableList.of(
|
||||||
|
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
final String json = jsonMapper.writeValueAsString(task);
|
||||||
|
|
||||||
|
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||||
|
final Task task2 = jsonMapper.readValue(json, Task.class);
|
||||||
|
|
||||||
|
Assert.assertEquals("foo", task.getDataSource());
|
||||||
|
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
|
||||||
|
|
||||||
|
Assert.assertEquals(task.getId(), task2.getId());
|
||||||
|
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
|
||||||
|
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
|
||||||
|
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
|
||||||
|
Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get());
|
||||||
|
Assert.assertEquals(((AppendTask) task).getSegments(), ((AppendTask) task2).getSegments());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHadoopIndexTaskSerde() throws Exception
|
public void testHadoopIndexTaskSerde() throws Exception
|
||||||
{
|
{
|
||||||
final HadoopIndexTask task = new HadoopIndexTask(
|
final HadoopIndexTask task = new HadoopIndexTask(
|
||||||
|
null,
|
||||||
new HadoopDruidIndexerConfig(
|
new HadoopDruidIndexerConfig(
|
||||||
null,
|
null,
|
||||||
"foo",
|
"foo",
|
||||||
|
|
|
@ -184,6 +184,7 @@ public class TaskLifecycleTest
|
||||||
public void testIndexTask() throws Exception
|
public void testIndexTask() throws Exception
|
||||||
{
|
{
|
||||||
final Task indexTask = new IndexTask(
|
final Task indexTask = new IndexTask(
|
||||||
|
null,
|
||||||
"foo",
|
"foo",
|
||||||
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
|
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
|
||||||
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
|
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
|
||||||
|
@ -226,6 +227,7 @@ public class TaskLifecycleTest
|
||||||
public void testIndexTaskFailure() throws Exception
|
public void testIndexTaskFailure() throws Exception
|
||||||
{
|
{
|
||||||
final Task indexTask = new IndexTask(
|
final Task indexTask = new IndexTask(
|
||||||
|
null,
|
||||||
"foo",
|
"foo",
|
||||||
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
|
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
|
||||||
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
|
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
|
||||||
|
@ -249,7 +251,7 @@ public class TaskLifecycleTest
|
||||||
{
|
{
|
||||||
// This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator
|
// This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator
|
||||||
// Such that this test can test things...
|
// Such that this test can test things...
|
||||||
final Task killTask = new KillTask("foo", new Interval("2010-01-02/P2D"));
|
final Task killTask = new KillTask(null, "foo", new Interval("2010-01-02/P2D"));
|
||||||
|
|
||||||
final TaskStatus status = runTask(killTask);
|
final TaskStatus status = runTask(killTask);
|
||||||
Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());
|
Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());
|
||||||
|
|
Loading…
Reference in New Issue