Merge branch 'master' into realtime-index-task

This commit is contained in:
Gian Merlino 2013-03-13 22:43:01 -07:00
commit df8e4d4061
43 changed files with 654 additions and 108 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -38,6 +38,7 @@ public class ConfigManager
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
private final ConcurrentMap<String, ConfigHolder> watchedConfigs; private final ConcurrentMap<String, ConfigHolder> watchedConfigs;
private final String selectStatement; private final String selectStatement;
private final String insertStatement;
private volatile ConfigManager.PollingCallable poller; private volatile ConfigManager.PollingCallable poller;
@ -49,6 +50,10 @@ public class ConfigManager
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s"); this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
this.watchedConfigs = Maps.newConcurrentMap(); this.watchedConfigs = Maps.newConcurrentMap();
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", config.getConfigTable()); this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", config.getConfigTable());
insertStatement = String.format(
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
config.getConfigTable()
);
} }
@LifecycleStart @LifecycleStart
@ -192,9 +197,7 @@ public class ConfigManager
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(insertStatement)
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload"
)
.bind("name", key) .bind("name", key)
.bind("payload", newBytes) .bind("payload", newBytes)
.execute(); .execute();

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -3,6 +3,7 @@ package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.RetryPolicy; import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.RetryPolicyFactory; import com.metamx.druid.merger.common.RetryPolicyFactory;
@ -106,6 +107,10 @@ public class RemoteTaskActionClient implements TaskActionClient
final int port; final int port;
final String path = "/mmx/merger/v1/action"; final String path = "/mmx/merger/v1/action";
if (instance == null) {
throw new ISE("Cannot find instance of indexer to talk to!");
}
host = instance.getAddress(); host = instance.getAddress();
if (instance.getSslPort() != null && instance.getSslPort() > 0) { if (instance.getSslPort() != null && instance.getSslPort() > 0) {

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Objects; import com.google.common.base.Objects;
@ -35,9 +36,16 @@ public abstract class AbstractTask implements Task
{ {
private static final Joiner ID_JOINER = Joiner.on("_"); private static final Joiner ID_JOINER = Joiner.on("_");
@JsonIgnore
private final String id; private final String id;
@JsonIgnore
private final String groupId; private final String groupId;
@JsonIgnore
private final String dataSource; private final String dataSource;
@JsonIgnore
private final Optional<Interval> interval; private final Optional<Interval> interval;
protected AbstractTask(String id, String dataSource, Interval interval) protected AbstractTask(String id, String dataSource, Interval interval)
@ -119,4 +127,32 @@ public abstract class AbstractTask implements Task
{ {
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AbstractTask that = (AbstractTask) o;
if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) {
return false;
}
if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) {
return false;
}
if (id != null ? !id.equals(that.id) : that.id != null) {
return false;
}
if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
return false;
}
return true;
}
} }

View File

@ -48,11 +48,12 @@ public class AppendTask extends MergeTaskBase
{ {
@JsonCreator @JsonCreator
public AppendTask( public AppendTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments @JsonProperty("segments") List<DataSegment> segments
) )
{ {
super(dataSource, segments); super(id, dataSource, segments);
} }
@Override @Override

View File

@ -50,12 +50,13 @@ public class DeleteTask extends AbstractTask
@JsonCreator @JsonCreator
public DeleteTask( public DeleteTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval @JsonProperty("interval") Interval interval
) )
{ {
super( super(
String.format( id != null ? id : String.format(
"delete_%s_%s_%s_%s", "delete_%s_%s_%s_%s",
dataSource, dataSource,
interval.getStart(), interval.getStart(),

View File

@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
@ -41,7 +42,7 @@ import java.util.List;
public class HadoopIndexTask extends AbstractTask public class HadoopIndexTask extends AbstractTask
{ {
@JsonProperty @JsonIgnore
private final HadoopDruidIndexerConfig config; private final HadoopDruidIndexerConfig config;
private static final Logger log = new Logger(HadoopIndexTask.class); private static final Logger log = new Logger(HadoopIndexTask.class);
@ -58,11 +59,12 @@ public class HadoopIndexTask extends AbstractTask
@JsonCreator @JsonCreator
public HadoopIndexTask( public HadoopIndexTask(
@JsonProperty("id") String id,
@JsonProperty("config") HadoopDruidIndexerConfig config @JsonProperty("config") HadoopDruidIndexerConfig config
) )
{ {
super( super(
String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()), id != null ? id : String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
config.getDataSource(), config.getDataSource(),
JodaUtils.umbrellaInterval(config.getIntervals()) JodaUtils.umbrellaInterval(config.getIntervals())
); );
@ -133,4 +135,10 @@ public class HadoopIndexTask extends AbstractTask
} }
} }
@JsonProperty
public HadoopDruidIndexerConfig getConfig()
{
return config;
}
} }

View File

@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -48,22 +49,23 @@ import java.util.Set;
public class IndexDeterminePartitionsTask extends AbstractTask public class IndexDeterminePartitionsTask extends AbstractTask
{ {
@JsonProperty @JsonIgnore
private final FirehoseFactory firehoseFactory; private final FirehoseFactory firehoseFactory;
@JsonProperty @JsonIgnore
private final Schema schema; private final Schema schema;
@JsonProperty @JsonIgnore
private final long targetPartitionSize; private final long targetPartitionSize;
@JsonProperty @JsonIgnore
private final int rowFlushBoundary; private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class); private static final Logger log = new Logger(IndexTask.class);
@JsonCreator @JsonCreator
public IndexDeterminePartitionsTask( public IndexDeterminePartitionsTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId, @JsonProperty("groupId") String groupId,
@JsonProperty("interval") Interval interval, @JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("firehose") FirehoseFactory firehoseFactory,
@ -73,7 +75,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
) )
{ {
super( super(
String.format( id != null ? id : String.format(
"%s_partitions_%s_%s", "%s_partitions_%s_%s",
groupId, groupId,
interval.getStart(), interval.getStart(),
@ -243,6 +245,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
public Task apply(ShardSpec shardSpec) public Task apply(ShardSpec shardSpec)
{ {
return new IndexGeneratorTask( return new IndexGeneratorTask(
null,
getGroupId(), getGroupId(),
getImplicitLockInterval().get(), getImplicitLockInterval().get(),
firehoseFactory, firehoseFactory,
@ -262,4 +265,28 @@ public class IndexDeterminePartitionsTask extends AbstractTask
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
@JsonProperty
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty
public Schema getSchema()
{
return schema;
}
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public int getRowFlushBoundary()
{
return rowFlushBoundary;
}
} }

View File

@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
@ -52,19 +53,20 @@ import java.util.concurrent.CopyOnWriteArrayList;
public class IndexGeneratorTask extends AbstractTask public class IndexGeneratorTask extends AbstractTask
{ {
@JsonProperty @JsonIgnore
private final FirehoseFactory firehoseFactory; private final FirehoseFactory firehoseFactory;
@JsonProperty @JsonIgnore
private final Schema schema; private final Schema schema;
@JsonProperty @JsonIgnore
private final int rowFlushBoundary; private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class); private static final Logger log = new Logger(IndexTask.class);
@JsonCreator @JsonCreator
public IndexGeneratorTask( public IndexGeneratorTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId, @JsonProperty("groupId") String groupId,
@JsonProperty("interval") Interval interval, @JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("firehose") FirehoseFactory firehoseFactory,
@ -73,7 +75,7 @@ public class IndexGeneratorTask extends AbstractTask
) )
{ {
super( super(
String.format( id != null ? id : String.format(
"%s_generator_%s_%s_%s", "%s_generator_%s_%s_%s",
groupId, groupId,
interval.getStart(), interval.getStart(),
@ -216,4 +218,22 @@ public class IndexGeneratorTask extends AbstractTask
return schema.getShardSpec().isInChunk(eventDimensions); return schema.getShardSpec().isInChunk(eventDimensions);
} }
@JsonProperty
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty
public Schema getSchema()
{
return schema;
}
@JsonProperty
public int getRowFlushBoundary()
{
return rowFlushBoundary;
}
} }

View File

@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -40,28 +41,29 @@ import java.util.List;
public class IndexTask extends AbstractTask public class IndexTask extends AbstractTask
{ {
@JsonProperty @JsonIgnore
private final GranularitySpec granularitySpec; private final GranularitySpec granularitySpec;
@JsonProperty @JsonIgnore
private final AggregatorFactory[] aggregators; private final AggregatorFactory[] aggregators;
@JsonProperty @JsonIgnore
private final QueryGranularity indexGranularity; private final QueryGranularity indexGranularity;
@JsonProperty @JsonIgnore
private final long targetPartitionSize; private final long targetPartitionSize;
@JsonProperty @JsonIgnore
private final FirehoseFactory firehoseFactory; private final FirehoseFactory firehoseFactory;
@JsonProperty @JsonIgnore
private final int rowFlushBoundary; private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class); private static final Logger log = new Logger(IndexTask.class);
@JsonCreator @JsonCreator
public IndexTask( public IndexTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("aggregators") AggregatorFactory[] aggregators, @JsonProperty("aggregators") AggregatorFactory[] aggregators,
@ -73,7 +75,7 @@ public class IndexTask extends AbstractTask
{ {
super( super(
// _not_ the version, just something uniqueish // _not_ the version, just something uniqueish
String.format("index_%s_%s", dataSource, new DateTime().toString()), id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()),
dataSource, dataSource,
new Interval( new Interval(
granularitySpec.bucketIntervals().first().getStart(), granularitySpec.bucketIntervals().first().getStart(),
@ -98,6 +100,7 @@ public class IndexTask extends AbstractTask
// Need to do one pass over the data before indexing in order to determine good partitions // Need to do one pass over the data before indexing in order to determine good partitions
retVal.add( retVal.add(
new IndexDeterminePartitionsTask( new IndexDeterminePartitionsTask(
null,
getGroupId(), getGroupId(),
interval, interval,
firehoseFactory, firehoseFactory,
@ -115,6 +118,7 @@ public class IndexTask extends AbstractTask
// Jump straight into indexing // Jump straight into indexing
retVal.add( retVal.add(
new IndexGeneratorTask( new IndexGeneratorTask(
null,
getGroupId(), getGroupId(),
interval, interval,
firehoseFactory, firehoseFactory,
@ -151,4 +155,41 @@ public class IndexTask extends AbstractTask
{ {
throw new IllegalStateException("IndexTasks should not be run!"); throw new IllegalStateException("IndexTasks should not be run!");
} }
@JsonProperty
public GranularitySpec getGranularitySpec()
{
return granularitySpec;
}
@JsonProperty
public AggregatorFactory[] getAggregators()
{
return aggregators;
}
@JsonProperty
public QueryGranularity getIndexGranularity()
{
return indexGranularity;
}
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty
public int getRowFlushBoundary()
{
return rowFlushBoundary;
}
} }

View File

@ -45,12 +45,13 @@ public class KillTask extends AbstractTask
@JsonCreator @JsonCreator
public KillTask( public KillTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval @JsonProperty("interval") Interval interval
) )
{ {
super( super(
String.format( id != null ? id : String.format(
"kill_%s_%s_%s_%s", "kill_%s_%s_%s_%s",
dataSource, dataSource,
interval.getStart(), interval.getStart(),

View File

@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -42,16 +43,18 @@ import java.util.Map;
*/ */
public class MergeTask extends MergeTaskBase public class MergeTask extends MergeTaskBase
{ {
@JsonIgnore
private final List<AggregatorFactory> aggregators; private final List<AggregatorFactory> aggregators;
@JsonCreator @JsonCreator
public MergeTask( public MergeTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments, @JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators @JsonProperty("aggregations") List<AggregatorFactory> aggregators
) )
{ {
super(dataSource, segments); super(id, dataSource, segments);
this.aggregators = aggregators; this.aggregators = aggregators;
} }
@ -86,4 +89,10 @@ public class MergeTask extends MergeTaskBase
{ {
return "merge"; return "merge";
} }
@JsonProperty("aggregations")
public List<AggregatorFactory> getAggregators()
{
return aggregators;
}
} }

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Function; import com.google.common.base.Function;
@ -58,15 +59,18 @@ import java.util.Set;
*/ */
public abstract class MergeTaskBase extends AbstractTask public abstract class MergeTaskBase extends AbstractTask
{ {
@JsonIgnore
private final List<DataSegment> segments; private final List<DataSegment> segments;
private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class); private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
protected MergeTaskBase(final String dataSource, final List<DataSegment> segments) protected MergeTaskBase(final String id, final String dataSource, final List<DataSegment> segments)
{ {
super( super(
// _not_ the version, just something uniqueish // _not_ the version, just something uniqueish
String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()), id != null ? id : String.format(
"merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()
),
dataSource, dataSource,
computeMergedInterval(segments) computeMergedInterval(segments)
); );

View File

@ -19,6 +19,8 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -51,16 +53,41 @@ public class VersionConverterTask extends AbstractTask
private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID); private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID);
private static final Logger log = new Logger(VersionConverterTask.class); private static final Logger log = new Logger(VersionConverterTask.class);
@JsonIgnore
private final DataSegment segment; private final DataSegment segment;
public VersionConverterTask( public static VersionConverterTask create(String dataSource, Interval interval)
{
final String id = makeId(dataSource, interval);
return new VersionConverterTask(id, id, dataSource, interval, null);
}
public static VersionConverterTask create(DataSegment segment)
{
final Interval interval = segment.getInterval();
final String dataSource = segment.getDataSource();
final String id = makeId(dataSource, interval);
return new VersionConverterTask(id, id, dataSource, interval, segment);
}
private static String makeId(String dataSource, Interval interval)
{
return joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime());
}
@JsonCreator
private VersionConverterTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval, @JsonProperty("interval") Interval interval,
@JsonProperty("segment") DataSegment segment @JsonProperty("segment") DataSegment segment
) )
{ {
super( super(
joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime()), id,
groupId,
dataSource, dataSource,
interval interval
); );
@ -74,6 +101,12 @@ public class VersionConverterTask extends AbstractTask
return TYPE; return TYPE;
} }
@JsonProperty
public DataSegment getSegment()
{
return segment;
}
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
@ -121,11 +154,32 @@ public class VersionConverterTask extends AbstractTask
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VersionConverterTask that = (VersionConverterTask) o;
if (segment != null ? !segment.equals(that.segment) : that.segment != null) {
return false;
}
return super.equals(o);
}
public static class SubTask extends AbstractTask public static class SubTask extends AbstractTask
{ {
@JsonIgnore
private final DataSegment segment; private final DataSegment segment;
protected SubTask( @JsonCreator
public SubTask(
@JsonProperty("groupId") String groupId, @JsonProperty("groupId") String groupId,
@JsonProperty("segment") DataSegment segment @JsonProperty("segment") DataSegment segment
) )
@ -145,6 +199,12 @@ public class VersionConverterTask extends AbstractTask
this.segment = segment; this.segment = segment;
} }
@JsonProperty
public DataSegment getSegment()
{
return segment;
}
@Override @Override
public String getType() public String getType()
{ {

View File

@ -36,6 +36,7 @@ import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig; import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
@ -52,7 +53,7 @@ public class DbTaskStorage implements TaskStorage
private final IndexerDbConnectorConfig dbConnectorConfig; private final IndexerDbConnectorConfig dbConnectorConfig;
private final DBI dbi; private final DBI dbi;
private static final Logger log = new Logger(DbTaskStorage.class); private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, DBI dbi) public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, DBI dbi)
{ {
@ -203,18 +204,18 @@ public class DbTaskStorage implements TaskStorage
} }
@Override @Override
public List<Task> getRunningTasks() public List<String> getRunningTaskIds()
{ {
return dbi.withHandle( return dbi.withHandle(
new HandleCallback<List<Task>>() new HandleCallback<List<String>>()
{ {
@Override @Override
public List<Task> withHandle(Handle handle) throws Exception public List<String> withHandle(Handle handle) throws Exception
{ {
final List<Map<String, Object>> dbTasks = final List<Map<String, Object>> dbTasks =
handle.createQuery( handle.createQuery(
String.format( String.format(
"SELECT payload FROM %s WHERE status_code = :status_code", "SELECT id FROM %s WHERE status_code = :status_code",
dbConnectorConfig.getTaskTable() dbConnectorConfig.getTaskTable()
) )
) )
@ -222,16 +223,12 @@ public class DbTaskStorage implements TaskStorage
.list(); .list();
return Lists.transform( return Lists.transform(
dbTasks, new Function<Map<String, Object>, Task>() dbTasks, new Function<Map<String, Object>, String>()
{ {
@Override @Override
public Task apply(Map<String, Object> row) public String apply(Map<String, Object> row)
{ {
try { return row.get("id").toString();
return jsonMapper.readValue(row.get("payload").toString(), Task.class);
} catch(Exception e) {
throw Throwables.propagate(e);
}
} }
} }
); );

View File

@ -128,15 +128,15 @@ public class HeapMemoryTaskStorage implements TaskStorage
} }
@Override @Override
public List<Task> getRunningTasks() public List<String> getRunningTaskIds()
{ {
giant.lock(); giant.lock();
try { try {
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder(); final ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
for(final TaskStuff taskStuff : tasks.values()) { for(final TaskStuff taskStuff : tasks.values()) {
if(taskStuff.getStatus().isRunnable()) { if(taskStuff.getStatus().isRunnable()) {
listBuilder.add(taskStuff.getTask()); listBuilder.add(taskStuff.getTask().getId());
} }
} }

View File

@ -571,4 +571,9 @@ public class RemoteTaskRunner implements TaskRunner
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
public static void main(String[] args)
{
System.out.println("2013-03-11".compareTo("0"));
}
} }

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.coordinator; package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -28,8 +29,8 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
@ -89,18 +90,32 @@ public class TaskQueue
queue.clear(); queue.clear();
taskLockbox.clear(); taskLockbox.clear();
// Add running tasks to the queue // Get all running tasks and their locks
final List<Task> runningTasks = taskStorage.getRunningTasks();
for(final Task task : runningTasks) {
queue.add(task);
}
// Get all locks, along with which tasks they belong to
final Multimap<TaskLock, Task> tasksByLock = ArrayListMultimap.create(); final Multimap<TaskLock, Task> tasksByLock = ArrayListMultimap.create();
for(final Task runningTask : runningTasks) {
for(final TaskLock taskLock : taskStorage.getLocks(runningTask.getId())) { for (final String taskId : taskStorage.getRunningTaskIds()) {
tasksByLock.put(taskLock, runningTask); try {
// .get since TaskStorage semantics should mean this task is always found
final Task task = taskStorage.getTask(taskId).get();
final List<TaskLock> taskLocks = taskStorage.getLocks(task.getId());
queue.add(task);
for (final TaskLock taskLock : taskLocks) {
tasksByLock.put(taskLock, task);
}
}
catch (Exception e) {
log.makeAlert("Failed to bootstrap task").addData("task", taskId).emit();
// A bit goofy to special-case JsonProcessingException, but we don't want to suppress bootstrap problems on
// any old Exception or even IOException...
if (e instanceof JsonProcessingException || e.getCause() instanceof JsonProcessingException) {
// Mark this task a failure, and continue bootstrapping
taskStorage.setStatus(TaskStatus.failure(taskId));
} else {
throw Throwables.propagate(e);
}
} }
} }
@ -150,7 +165,7 @@ public class TaskQueue
} }
} }
log.info("Bootstrapped %,d tasks. Ready to go!", runningTasks.size()); log.info("Bootstrapped %,d tasks with %,d locks. Ready to go!", queue.size(), tasksByLock.keySet().size());
} finally { } finally {
giant.unlock(); giant.unlock();
} }
@ -214,7 +229,7 @@ public class TaskQueue
// insert the task into our queue. // insert the task into our queue.
try { try {
taskStorage.insert(task, TaskStatus.running(task.getId())); taskStorage.insert(task, TaskStatus.running(task.getId()));
} catch(TaskExistsException e) { } catch (TaskExistsException e) {
log.warn("Attempt to add task twice: %s", task.getId()); log.warn("Attempt to add task twice: %s", task.getId());
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }

View File

@ -20,9 +20,9 @@
package com.metamx.druid.merger.coordinator; package com.metamx.druid.merger.coordinator;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskAction; import com.metamx.druid.merger.common.actions.TaskAction;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import java.util.List; import java.util.List;
@ -77,9 +77,9 @@ public interface TaskStorage
public List<TaskAction> getAuditLogs(String taskid); public List<TaskAction> getAuditLogs(String taskid);
/** /**
* Returns a list of currently-running tasks as stored in the storage facility, in no particular order. * Returns a list of currently-running task IDs as stored in the storage facility, in no particular order.
*/ */
public List<Task> getRunningTasks(); public List<String> getRunningTaskIds();
/** /**
* Returns a list of locks for a particular task. * Returns a list of locks for a particular task.

View File

@ -177,6 +177,9 @@ public class IndexerCoordinatorResource
if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) { if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) {
return Response.status(Response.Status.BAD_REQUEST).build(); return Response.status(Response.Status.BAD_REQUEST).build();
} }
log.info("Updating Worker Setup configs: %s", workerSetupData);
return Response.ok().build(); return Response.ok().build();
} }

View File

@ -42,7 +42,6 @@ public class AutoScalingData<T>
return nodeIds; return nodeIds;
} }
@JsonProperty
public List<T> getNodes() public List<T> getNodes()
{ {
return nodes; return nodes;

View File

@ -102,18 +102,22 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
} }
} else { } else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime); Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
log.info(
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s",
currentlyProvisioning,
durSinceLastProvision
);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node provisioning taking too long") log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) .addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size()) .addData("provisioningCount", currentlyProvisioning.size())
.emit(); .emit();
}
log.info( currentlyProvisioning.clear();
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", }
currentlyProvisioning
);
} }
return false; return false;
@ -203,18 +207,21 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return true; return true;
} }
} else { } else {
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime); Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node termination taking too long")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
}
log.info( log.info(
"%s still terminating. Wait for all nodes to terminate before trying again.", "%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating currentlyTerminating
); );
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node termination taking too long!")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
currentlyTerminating.clear();
}
} }
return false; return false;

View File

@ -91,4 +91,17 @@ public class EC2NodeData
{ {
return keyName; return keyName;
} }
@Override
public String toString()
{
return "EC2NodeData{" +
"amiId='" + amiId + '\'' +
", instanceType='" + instanceType + '\'' +
", minInstances=" + minInstances +
", maxInstances=" + maxInstances +
", securityGroupIds=" + securityGroupIds +
", keyName='" + keyName + '\'' +
'}';
}
} }

View File

@ -60,4 +60,14 @@ public class GalaxyUserData
{ {
return type; return type;
} }
@Override
public String toString()
{
return "GalaxyUserData{" +
"env='" + env + '\'' +
", version='" + version + '\'' +
", type='" + type + '\'' +
'}';
}
} }

View File

@ -75,4 +75,15 @@ public class WorkerSetupData
{ {
return userData; return userData;
} }
@Override
public String toString()
{
return "WorkerSetupData{" +
"minVersion='" + minVersion + '\'' +
", minNumWorkers=" + minNumWorkers +
", nodeData=" + nodeData +
", userData=" + userData +
'}';
}
} }

View File

@ -35,7 +35,6 @@ import java.util.List;
@JsonTypeName("test") @JsonTypeName("test")
public class TestTask extends MergeTask public class TestTask extends MergeTask
{ {
private final String id;
private final TaskStatus status; private final TaskStatus status;
@JsonCreator @JsonCreator
@ -47,19 +46,10 @@ public class TestTask extends MergeTask
@JsonProperty("taskStatus") TaskStatus status @JsonProperty("taskStatus") TaskStatus status
) )
{ {
super(dataSource, segments, aggregators); super(id, dataSource, segments, aggregators);
this.id = id;
this.status = status; this.status = status;
} }
@Override
@JsonProperty
public String getId()
{
return id;
}
@Override @Override
@JsonProperty @JsonProperty
public String getType() public String getType()

View File

@ -43,7 +43,7 @@ public class MergeTaskBaseTest
.add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build()) .add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build())
.build(); .build();
final MergeTaskBase testMergeTaskBase = new MergeTaskBase("foo", segments) final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments)
{ {
@Override @Override
protected File merge(Map<DataSegment, File> segments, File outDir) throws Exception protected File merge(Map<DataSegment, File> segments, File outDir) throws Exception

View File

@ -5,6 +5,7 @@ import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.druid.QueryGranularity; import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.HadoopDruidIndexerConfig; import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
@ -26,6 +27,7 @@ public class TaskSerdeTest
public void testIndexTaskSerde() throws Exception public void testIndexTaskSerde() throws Exception
{ {
final Task task = new IndexTask( final Task task = new IndexTask(
null,
"foo", "foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))), new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
@ -54,6 +56,7 @@ public class TaskSerdeTest
public void testIndexGeneratorTaskSerde() throws Exception public void testIndexGeneratorTaskSerde() throws Exception
{ {
final Task task = new IndexGeneratorTask( final Task task = new IndexGeneratorTask(
null,
"foo", "foo",
new Interval("2010-01-01/P1D"), new Interval("2010-01-01/P1D"),
null, null,
@ -68,6 +71,8 @@ public class TaskSerdeTest
final ObjectMapper jsonMapper = new DefaultObjectMapper(); final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class); final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource()); Assert.assertEquals("foo", task.getDataSource());
@ -80,17 +85,23 @@ public class TaskSerdeTest
} }
@Test @Test
public void testAppendTaskSerde() throws Exception public void testMergeTaskSerde() throws Exception
{ {
final Task task = new AppendTask( final Task task = new MergeTask(
null,
"foo", "foo",
ImmutableList.<DataSegment>of( ImmutableList.<DataSegment>of(
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
),
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("cnt")
) )
); );
final ObjectMapper jsonMapper = new DefaultObjectMapper(); final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class); final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource()); Assert.assertEquals("foo", task.getDataSource());
@ -100,20 +111,131 @@ public class TaskSerdeTest
Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
Assert.assertEquals(((MergeTask) task).getSegments(), ((MergeTask) task2).getSegments());
Assert.assertEquals(
((MergeTask) task).getAggregators().get(0).getName(),
((MergeTask) task2).getAggregators().get(0).getName()
);
} }
@Test @Test
public void testDeleteTaskSerde() throws Exception public void testKillTaskSerde() throws Exception
{ {
final Task task = new DeleteTask( final Task task = new KillTask(
null,
"foo", "foo",
new Interval("2010-01-01/P1D") new Interval("2010-01-01/P1D")
); );
final ObjectMapper jsonMapper = new DefaultObjectMapper(); final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class); final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
}
@Test
public void testVersionConverterTaskSerde() throws Exception
{
final Task task = VersionConverterTask.create(
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
Assert.assertEquals(((VersionConverterTask) task).getSegment(), ((VersionConverterTask) task).getSegment());
}
@Test
public void testVersionConverterSubTaskSerde() throws Exception
{
final Task task = new VersionConverterTask.SubTask(
"myGroupId",
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
Assert.assertEquals("myGroupId", task.getGroupId());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
Assert.assertEquals(
((VersionConverterTask.SubTask) task).getSegment(),
((VersionConverterTask.SubTask) task).getSegment()
);
}
@Test
public void testDeleteTaskSerde() throws Exception
{
final Task task = new DeleteTask(
null,
"foo",
new Interval("2010-01-01/P1D")
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get());
}
@Test
public void testDeleteTaskFromJson() throws Exception
{
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final Task task = jsonMapper.readValue(
"{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}",
Task.class
);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertNotNull(task.getId());
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getDataSource(), task2.getDataSource());
@ -121,10 +243,39 @@ public class TaskSerdeTest
Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get()); Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get());
} }
@Test
public void testAppendTaskSerde() throws Exception
{
final Task task = new AppendTask(
null,
"foo",
ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
)
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get());
Assert.assertEquals(((AppendTask) task).getSegments(), ((AppendTask) task2).getSegments());
}
@Test @Test
public void testHadoopIndexTaskSerde() throws Exception public void testHadoopIndexTaskSerde() throws Exception
{ {
final HadoopIndexTask task = new HadoopIndexTask( final HadoopIndexTask task = new HadoopIndexTask(
null,
new HadoopDruidIndexerConfig( new HadoopDruidIndexerConfig(
null, null,
"foo", "foo",

View File

@ -0,0 +1,66 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.task;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
/**
*/
public class VersionConverterTaskTest
{
@Test
public void testSerializationSimple() throws Exception
{
final String dataSource = "billy";
final Interval interval = new Interval(new DateTime().minus(1000), new DateTime());
DefaultObjectMapper jsonMapper = new DefaultObjectMapper();
VersionConverterTask task = VersionConverterTask.create(dataSource, interval);
Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
Assert.assertEquals(task, task2);
DataSegment segment = new DataSegment(
dataSource,
interval,
new DateTime().toString(),
ImmutableMap.<String, Object>of(),
ImmutableList.<String>of(),
ImmutableList.<String>of(),
new NoneShardSpec(),
9,
102937
);
task = VersionConverterTask.create(segment);
task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class);
Assert.assertEquals(task, task2);
}
}

View File

@ -186,6 +186,7 @@ public class TaskLifecycleTest
public void testIndexTask() throws Exception public void testIndexTask() throws Exception
{ {
final Task indexTask = new IndexTask( final Task indexTask = new IndexTask(
null,
"foo", "foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))), new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
@ -228,6 +229,7 @@ public class TaskLifecycleTest
public void testIndexTaskFailure() throws Exception public void testIndexTaskFailure() throws Exception
{ {
final Task indexTask = new IndexTask( final Task indexTask = new IndexTask(
null,
"foo", "foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
@ -251,7 +253,7 @@ public class TaskLifecycleTest
{ {
// This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator // This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator
// Such that this test can test things... // Such that this test can test things...
final Task killTask = new KillTask("foo", new Interval("2010-01-02/P2D")); final Task killTask = new KillTask(null, "foo", new Interval("2010-01-02/P2D"));
final TaskStatus status = runTask(killTask); final TaskStatus status = runTask(killTask);
Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode()); Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());

View File

@ -30,6 +30,10 @@ import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.ZkWorker; import com.metamx.druid.merger.coordinator.ZkWorker;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import junit.framework.Assert; import junit.framework.Assert;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -90,7 +94,7 @@ public class SimpleResourceManagementStrategyTest
@Override @Override
public Duration getMaxScalingDuration() public Duration getMaxScalingDuration()
{ {
return null; return new Duration(1000);
} }
@Override @Override
@ -184,6 +188,62 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScalingStrategy);
} }
@Test
public void testProvisionAlert() throws Exception
{
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall();
EasyMock.replay(emitter);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker"))
);
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
Assert.assertTrue(provisionedSomething);
Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1);
DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
Thread.sleep(2000);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
);
Assert.assertFalse(provisionedSomething);
Assert.assertTrue(
simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
);
DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp();
Assert.assertTrue(
createdTime.equals(anotherCreatedTime)
);
EasyMock.verify(autoScalingStrategy);
EasyMock.verify(emitter);
}
@Test @Test
public void testDoSuccessfulTerminate() throws Exception public void testDoSuccessfulTerminate() throws Exception
{ {

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<properties> <properties>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.19-SNAPSHOT</version> <version>0.3.22-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -526,6 +526,7 @@ public class DruidMaster
final Integer binaryVersion = dataSegment.getBinaryVersion(); final Integer binaryVersion = dataSegment.getBinaryVersion();
if (binaryVersion == null || binaryVersion < IndexIO.CURRENT_VERSION_ID) { if (binaryVersion == null || binaryVersion < IndexIO.CURRENT_VERSION_ID) {
log.info("Upgrading version on segment[%s]", dataSegment.getIdentifier());
indexingServiceClient.upgradeSegment(dataSegment); indexingServiceClient.upgradeSegment(dataSegment);
} }
} }