Merge pull request #64 from metamx/hadoop-index-task

Hadoop index task
This commit is contained in:
cheddar 2013-03-01 10:02:46 -08:00
commit e0bf3187b2
14 changed files with 354 additions and 96 deletions

View File

@ -66,15 +66,38 @@ public class JodaUtils
return retVal; return retVal;
} }
public static boolean overlaps(final Interval i, Iterable<Interval> intervals) { public static Interval umbrellaInterval(Iterable<Interval> intervals)
return Iterables.any(intervals, new Predicate<Interval>() {
ArrayList<DateTime> startDates = Lists.newArrayList();
ArrayList<DateTime> endDates = Lists.newArrayList();
for (Interval interval : intervals) {
startDates.add(interval.getStart());
endDates.add(interval.getEnd());
}
DateTime minStart = minDateTime(startDates.toArray(new DateTime[]{}));
DateTime maxEnd = maxDateTime(endDates.toArray(new DateTime[]{}));
if (minStart == null || maxEnd == null) {
throw new IllegalArgumentException("Empty list of intervals");
}
return new Interval(minStart, maxEnd);
}
public static boolean overlaps(final Interval i, Iterable<Interval> intervals)
{
return Iterables.any(
intervals, new Predicate<Interval>()
{ {
@Override @Override
public boolean apply(@Nullable Interval input) public boolean apply(@Nullable Interval input)
{ {
return input.overlaps(i); return input.overlaps(i);
} }
}); }
);
} }
public static DateTime minDateTime(DateTime... times) public static DateTime minDateTime(DateTime... times)
@ -84,8 +107,10 @@ public class JodaUtils
} }
switch (times.length) { switch (times.length) {
case 0: return null; case 0:
case 1: return times[0]; return null;
case 1:
return times[0];
default: default:
DateTime min = times[0]; DateTime min = times[0];
for (int i = 1; i < times.length; ++i) { for (int i = 1; i < times.length; ++i) {
@ -102,8 +127,10 @@ public class JodaUtils
} }
switch (times.length) { switch (times.length) {
case 0: return null; case 0:
case 1: return times[0]; return null;
case 1:
return times[0];
default: default:
DateTime max = times[0]; DateTime max = times[0];
for (int i = 1; i < times.length; ++i) { for (int i = 1; i < times.length; ++i) {

View File

@ -31,6 +31,40 @@ import java.util.List;
*/ */
public class JodaUtilsTest public class JodaUtilsTest
{ {
@Test
public void testUmbrellaIntervalsSimple() throws Exception
{
List<Interval> intervals = Arrays.asList(
new Interval("2011-03-03/2011-03-04"),
new Interval("2011-01-01/2011-01-02"),
new Interval("2011-02-01/2011-02-05"),
new Interval("2011-02-03/2011-02-08"),
new Interval("2011-01-01/2011-01-03"),
new Interval("2011-03-01/2011-03-02"),
new Interval("2011-03-05/2011-03-06"),
new Interval("2011-02-01/2011-02-02")
);
Assert.assertEquals(
new Interval("2011-01-01/2011-03-06"),
JodaUtils.umbrellaInterval(intervals)
);
}
@Test
public void testUmbrellaIntervalsNull() throws Exception
{
List<Interval> intervals = Arrays.asList();
Throwable thrown = null;
try {
Interval res = JodaUtils.umbrellaInterval(intervals);
}
catch (IllegalArgumentException e) {
thrown = e;
}
Assert.assertNotNull("Empty list of intervals", thrown);
}
@Test @Test
public void testCondenseIntervalsSimple() throws Exception public void testCondenseIntervalsSimple() throws Exception
{ {

View File

@ -20,24 +20,18 @@
package com.metamx.druid.indexer; package com.metamx.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnector;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
import java.util.List; import java.util.List;
/** /**
@ -52,9 +46,6 @@ public class DbUpdaterJob implements Jobby
private final DbUpdaterJobSpec spec; private final DbUpdaterJobSpec spec;
private final DBI dbi; private final DBI dbi;
// Keep track of published segment identifiers, in case a client is interested.
private volatile ImmutableList<DataSegment> publishedSegments = null;
public DbUpdaterJob( public DbUpdaterJob(
HadoopDruidIndexerConfig config HadoopDruidIndexerConfig config
) )
@ -67,23 +58,7 @@ public class DbUpdaterJob implements Jobby
@Override @Override
public boolean run() public boolean run()
{ {
final Configuration conf = new Configuration(); final List<DataSegment> segments = IndexGeneratorJob.getPublishedSegments(config);
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
dbi.withHandle( dbi.withHandle(
new HandleCallback<Void>() new HandleCallback<Void>()
@ -91,51 +66,40 @@ public class DbUpdaterJob implements Jobby
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement(String.format( final PreparedBatch batch = handle.prepareBatch(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
spec.getSegmentTable() spec.getSegmentTable()
)) )
.bind("id", segment.getIdentifier()) );
.bind("dataSource", segment.getDataSource()) for (final DataSegment segment : segments) {
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString()) batch.add(
.bind("end", segment.getInterval().getEnd().toString()) new ImmutableMap.Builder()
.bind("partitioned", segment.getShardSpec().getPartitionNum()) .put("id", segment.getIdentifier())
.bind("version", segment.getVersion()) .put("dataSource", segment.getDataSource())
.bind("used", true) .put("created_date", new DateTime().toString())
.bind("payload", jsonMapper.writeValueAsString(segment)) .put("start", segment.getInterval().getStart().toString())
.execute(); .put("end", segment.getInterval().getEnd().toString())
.put("partitioned", segment.getShardSpec().getPartitionNum())
.put("version", segment.getVersion())
.put("used", true)
.put("payload", jsonMapper.writeValueAsString(segment))
.build()
);
log.info("Published %s", segment.getIdentifier());
}
batch.execute();
return null; return null;
} }
} }
); );
publishedSegmentsBuilder.add(segment);
log.info("Published %s", segment.getIdentifier());
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
publishedSegments = publishedSegmentsBuilder.build();
return true; return true;
} }
/**
* Returns a list of segment identifiers published by the most recent call to run().
* Throws an IllegalStateException if run() has never been called.
*/
public List<DataSegment> getPublishedSegments()
{
if (publishedSegments == null) {
log.error("getPublishedSegments called before run!");
throw new IllegalStateException("DbUpdaterJob has not run yet");
} else {
return publishedSegments;
}
}
} }

View File

@ -83,7 +83,7 @@ public class HadoopDruidIndexerAzkWrapper
final HadoopDruidIndexerConfig config = jsonMapper.convertValue(theMap, HadoopDruidIndexerConfig.class); final HadoopDruidIndexerConfig config = jsonMapper.convertValue(theMap, HadoopDruidIndexerConfig.class);
config.setIntervals(dataInterval); config.setIntervals(dataInterval);
config.setVersion(new DateTime()); config.setVersion(new DateTime().toString());
new HadoopDruidIndexerJob(config).run(); new HadoopDruidIndexerJob(config).run();
} }

View File

@ -175,7 +175,7 @@ public class HadoopDruidIndexerConfig
private volatile PathSpec pathSpec; private volatile PathSpec pathSpec;
private volatile String jobOutputDir; private volatile String jobOutputDir;
private volatile String segmentOutputDir; private volatile String segmentOutputDir;
private volatile DateTime version = new DateTime(); private volatile String version = new DateTime().toString();
private volatile PartitionsSpec partitionsSpec; private volatile PartitionsSpec partitionsSpec;
private volatile boolean leaveIntermediate = false; private volatile boolean leaveIntermediate = false;
private volatile boolean cleanupOnFailure = true; private volatile boolean cleanupOnFailure = true;
@ -198,7 +198,7 @@ public class HadoopDruidIndexerConfig
final @JsonProperty("pathSpec") PathSpec pathSpec, final @JsonProperty("pathSpec") PathSpec pathSpec,
final @JsonProperty("workingPath") String jobOutputDir, final @JsonProperty("workingPath") String jobOutputDir,
final @JsonProperty("segmentOutputPath") String segmentOutputDir, final @JsonProperty("segmentOutputPath") String segmentOutputDir,
final @JsonProperty("version") DateTime version, final @JsonProperty("version") String version,
final @JsonProperty("partitionDimension") String partitionDimension, final @JsonProperty("partitionDimension") String partitionDimension,
final @JsonProperty("targetPartitionSize") Long targetPartitionSize, final @JsonProperty("targetPartitionSize") Long targetPartitionSize,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
@ -220,7 +220,7 @@ public class HadoopDruidIndexerConfig
this.pathSpec = pathSpec; this.pathSpec = pathSpec;
this.jobOutputDir = jobOutputDir; this.jobOutputDir = jobOutputDir;
this.segmentOutputDir = segmentOutputDir; this.segmentOutputDir = segmentOutputDir;
this.version = version == null ? new DateTime() : version; this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec; this.partitionsSpec = partitionsSpec;
this.leaveIntermediate = leaveIntermediate; this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure; this.cleanupOnFailure = cleanupOnFailure;
@ -410,12 +410,12 @@ public class HadoopDruidIndexerConfig
} }
@JsonProperty @JsonProperty
public DateTime getVersion() public String getVersion()
{ {
return version; return version;
} }
public void setVersion(DateTime version) public void setVersion(String version)
{ {
this.version = version; this.version = version;
} }
@ -624,7 +624,7 @@ public class HadoopDruidIndexerConfig
*/ */
public Path makeIntermediatePath() public Path makeIntermediatePath()
{ {
return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().toString().replace(":", ""))); return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().replace(":", "")));
} }
public Path makeSegmentPartitionInfoPath(Bucket bucket) public Path makeSegmentPartitionInfoPath(Bucket bucket)
@ -667,7 +667,7 @@ public class HadoopDruidIndexerConfig
dataSource, dataSource,
bucketInterval.getStart().toString(), bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(), bucketInterval.getEnd().toString(),
getVersion().toString(), getVersion(),
bucket.partitionNum bucket.partitionNum
) )
); );

View File

@ -47,6 +47,7 @@ public class HadoopDruidIndexerJob implements Jobby
private final HadoopDruidIndexerConfig config; private final HadoopDruidIndexerConfig config;
private final DbUpdaterJob dbUpdaterJob; private final DbUpdaterJob dbUpdaterJob;
private IndexGeneratorJob indexJob; private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null;
public HadoopDruidIndexerJob( public HadoopDruidIndexerJob(
HadoopDruidIndexerConfig config HadoopDruidIndexerConfig config
@ -102,6 +103,8 @@ public class HadoopDruidIndexerJob implements Jobby
} }
} }
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
if (!config.isLeaveIntermediate()) { if (!config.isLeaveIntermediate()) {
if (failedMessage == null || config.isCleanupOnFailure()) { if (failedMessage == null || config.isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath(); Path workingPath = config.makeIntermediatePath();
@ -147,8 +150,10 @@ public class HadoopDruidIndexerJob implements Jobby
} }
public List<DataSegment> getPublishedSegments() { public List<DataSegment> getPublishedSegments() {
Preconditions.checkState(dbUpdaterJob != null, "No updaterJobSpec set, cannot get published segments"); if(publishedSegments == null) {
return dbUpdaterJob.getPublishedSegments(); throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
}
return publishedSegments;
} }
public IndexGeneratorJob.IndexGeneratorStats getIndexJobStats() public IndexGeneratorJob.IndexGeneratorStats getIndexJobStats()

View File

@ -21,6 +21,7 @@ package com.metamx.druid.indexer;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -57,6 +58,7 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -149,7 +151,40 @@ public class IndexGeneratorJob implements Jobby
} }
} }
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) {
final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
publishedSegmentsBuilder.add(segment);
log.info("Adding segment %s to the list of published segments", segment.getIdentifier());
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
return publishedSegments;
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text> public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
{ {
@Override @Override
protected void innerMap( protected void innerMap(
@ -389,7 +424,7 @@ public class IndexGeneratorJob implements Jobby
DataSegment segment = new DataSegment( DataSegment segment = new DataSegment(
config.getDataSource(), config.getDataSource(),
interval, interval,
config.getVersion().toString(), config.getVersion(),
loadSpec, loadSpec,
dimensionNames, dimensionNames,
metricNames, metricNames,

View File

@ -15,6 +15,9 @@ public abstract class TaskConfig
@Default("500000") @Default("500000")
public abstract int getDefaultRowFlushBoundary(); public abstract int getDefaultRowFlushBoundary();
@Config("druid.merger.hadoopWorkingPath")
public abstract String getHadoopWorkingPath();
public File getTaskDir(final Task task) { public File getTaskDir(final Task task) {
return new File(getBaseTaskDir(), task.getId()); return new File(getBaseTaskDir(), task.getId());
} }

View File

@ -0,0 +1,117 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
import com.metamx.druid.indexer.HadoopDruidIndexerJob;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.utils.JodaUtils;
import org.joda.time.DateTime;
import java.util.List;
public class HadoopIndexTask extends AbstractTask
{
@JsonProperty
private final HadoopDruidIndexerConfig config;
private static final Logger log = new Logger(HadoopIndexTask.class);
/**
* @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
* <p/>
* Here, we will ensure that the UpdaterJobSpec field of the config is set to null, such that the
* job does not push a list of published segments the database. Instead, we will use the method
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
* segments, and let the indexing service report these segments to the database.
*/
@JsonCreator
public HadoopIndexTask(
@JsonProperty("config") HadoopDruidIndexerConfig config
)
{
super(
String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
config.getDataSource(),
JodaUtils.umbrellaInterval(config.getIntervals())
);
// Some HadoopDruidIndexerConfig stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(config.getSegmentOutputDir() == null, "segmentOutputPath must be absent");
Preconditions.checkArgument(config.getJobOutputDir() == null, "workingPath must be absent");
Preconditions.checkArgument(!config.isUpdaterJobSpecSet(), "updaterJobSpec must be absent");
this.config = config;
}
@Override
public String getType()
{
return "index_hadoop";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Copy config so we don't needlessly modify our provided one
// Also necessary to make constructor validations work upon serde-after-run
final HadoopDruidIndexerConfig configCopy = toolbox.getObjectMapper()
.readValue(
toolbox.getObjectMapper().writeValueAsBytes(config),
HadoopDruidIndexerConfig.class
);
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this)));
log.info("Setting version to: %s", myLock.getVersion());
configCopy.setVersion(myLock.getVersion());
// Set workingPath to some reasonable default
configCopy.setJobOutputDir(toolbox.getConfig().getHadoopWorkingPath());
if (toolbox.getSegmentPusher() instanceof S3DataSegmentPusher) {
// Hack alert! Bypassing DataSegmentPusher...
S3DataSegmentPusher segmentPusher = (S3DataSegmentPusher) toolbox.getSegmentPusher();
String s3Path = String.format(
"s3n://%s/%s/%s",
segmentPusher.getConfig().getBucket(),
segmentPusher.getConfig().getBaseKey(),
getDataSource()
);
log.info("Setting segment output path to: %s", s3Path);
configCopy.setSegmentOutputDir(s3Path);
} else {
throw new IllegalStateException("Sorry, we only work with S3DataSegmentPushers! Bummer!");
}
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(configCopy);
configCopy.verify();
log.info("Starting a hadoop index generator job...");
if (job.run()) {
List<DataSegment> publishedSegments = job.getPublishedSegments();
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.copyOf(publishedSegments)));
// Done
return TaskStatus.success(getId());
} else {
return TaskStatus.failure(getId());
}
}
}

View File

@ -37,7 +37,8 @@ import org.joda.time.Interval;
@JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class), @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class),
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class) @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class)
}) })
public interface Task public interface Task
{ {

View File

@ -1,12 +1,17 @@
package com.metamx.druid.merger.common.task; package com.metamx.druid.merger.common.task;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.druid.QueryGranularity; import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
import com.metamx.druid.indexer.data.JSONDataSpec;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec; import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.path.StaticPathSpec;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
@ -32,13 +37,17 @@ public class TaskSerdeTest
final ObjectMapper jsonMapper = new DefaultObjectMapper(); final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final Task task2 = jsonMapper.readValue(json, Task.class); final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P2D")), task.getFixedInterval());
Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get());
} }
@Test @Test
@ -61,11 +70,13 @@ public class TaskSerdeTest
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
final Task task2 = jsonMapper.readValue(json, Task.class); final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval());
Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get());
} }
@Test @Test
@ -82,11 +93,13 @@ public class TaskSerdeTest
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
final Task task2 = jsonMapper.readValue(json, Task.class); final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval());
Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get());
} }
@Test @Test
@ -99,7 +112,6 @@ public class TaskSerdeTest
final ObjectMapper jsonMapper = new DefaultObjectMapper(); final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task); final String json = jsonMapper.writeValueAsString(task);
System.out.println(json);
final Task task2 = jsonMapper.readValue(json, Task.class); final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getId(), task2.getId());
@ -108,4 +120,47 @@ public class TaskSerdeTest
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get());
} }
@Test
public void testHadoopIndexTaskSerde() throws Exception
{
final HadoopIndexTask task = new HadoopIndexTask(
new HadoopDruidIndexerConfig(
null,
"foo",
"timestamp",
"auto",
new JSONDataSpec(ImmutableList.of("foo")),
null,
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
new StaticPathSpec("bar"),
null,
null,
null,
null,
null,
null,
false,
true,
null,
false,
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
null,
false,
ImmutableList.<String>of()
)
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval());
}
} }

View File

@ -300,6 +300,12 @@ public class RemoteTaskRunnerTest
{ {
return 0; return 0;
} }
@Override
public String getHadoopWorkingPath()
{
return null;
}
}, null, null, null, null, null, jsonMapper }, null, null, null, null, null, jsonMapper
), ),
Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor()

View File

@ -110,6 +110,12 @@ public class TaskLifecycleTest
{ {
return 50000; return 50000;
} }
@Override
public String getHadoopWorkingPath()
{
return null;
}
}, },
new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())), new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())),
newMockEmitter(), newMockEmitter(),

View File

@ -57,6 +57,11 @@ public class S3DataSegmentPusher implements DataSegmentPusher
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
} }
public S3DataSegmentPusherConfig getConfig()
{
return config;
}
@Override @Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
{ {