diff --git a/common/src/main/java/com/metamx/druid/utils/JodaUtils.java b/common/src/main/java/com/metamx/druid/utils/JodaUtils.java index c7feeadc44c..9368a8af111 100644 --- a/common/src/main/java/com/metamx/druid/utils/JodaUtils.java +++ b/common/src/main/java/com/metamx/druid/utils/JodaUtils.java @@ -44,7 +44,7 @@ public class JodaUtils for (Interval interval : intervals) { sortedIntervals.add(interval); } - + if (sortedIntervals.isEmpty()) { return Lists.newArrayList(); } @@ -66,15 +66,38 @@ public class JodaUtils return retVal; } - public static boolean overlaps(final Interval i, Iterable intervals) { - return Iterables.any(intervals, new Predicate() + public static Interval umbrellaInterval(Iterable intervals) + { + ArrayList startDates = Lists.newArrayList(); + ArrayList endDates = Lists.newArrayList(); + + for (Interval interval : intervals) { + startDates.add(interval.getStart()); + endDates.add(interval.getEnd()); + } + + DateTime minStart = minDateTime(startDates.toArray(new DateTime[]{})); + DateTime maxEnd = maxDateTime(endDates.toArray(new DateTime[]{})); + + if (minStart == null || maxEnd == null) { + throw new IllegalArgumentException("Empty list of intervals"); + } + return new Interval(minStart, maxEnd); + } + + public static boolean overlaps(final Interval i, Iterable intervals) + { + return Iterables.any( + intervals, new Predicate() { @Override public boolean apply(@Nullable Interval input) { return input.overlaps(i); } - }); + } + ); + } public static DateTime minDateTime(DateTime... times) @@ -84,8 +107,10 @@ public class JodaUtils } switch (times.length) { - case 0: return null; - case 1: return times[0]; + case 0: + return null; + case 1: + return times[0]; default: DateTime min = times[0]; for (int i = 1; i < times.length; ++i) { @@ -102,8 +127,10 @@ public class JodaUtils } switch (times.length) { - case 0: return null; - case 1: return times[0]; + case 0: + return null; + case 1: + return times[0]; default: DateTime max = times[0]; for (int i = 1; i < times.length; ++i) { diff --git a/common/src/test/java/com/metamx/druid/utils/JodaUtilsTest.java b/common/src/test/java/com/metamx/druid/utils/JodaUtilsTest.java index 6abaf485b68..f0248295dfd 100644 --- a/common/src/test/java/com/metamx/druid/utils/JodaUtilsTest.java +++ b/common/src/test/java/com/metamx/druid/utils/JodaUtilsTest.java @@ -31,6 +31,40 @@ import java.util.List; */ public class JodaUtilsTest { + @Test + public void testUmbrellaIntervalsSimple() throws Exception + { + List intervals = Arrays.asList( + new Interval("2011-03-03/2011-03-04"), + new Interval("2011-01-01/2011-01-02"), + new Interval("2011-02-01/2011-02-05"), + new Interval("2011-02-03/2011-02-08"), + new Interval("2011-01-01/2011-01-03"), + new Interval("2011-03-01/2011-03-02"), + new Interval("2011-03-05/2011-03-06"), + new Interval("2011-02-01/2011-02-02") + ); + + Assert.assertEquals( + new Interval("2011-01-01/2011-03-06"), + JodaUtils.umbrellaInterval(intervals) + ); + } + + @Test + public void testUmbrellaIntervalsNull() throws Exception + { + List intervals = Arrays.asList(); + Throwable thrown = null; + try { + Interval res = JodaUtils.umbrellaInterval(intervals); + } + catch (IllegalArgumentException e) { + thrown = e; + } + Assert.assertNotNull("Empty list of intervals", thrown); + } + @Test public void testCondenseIntervalsSimple() throws Exception { diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java index a7dbee1f4e2..3025df982e8 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java @@ -20,24 +20,18 @@ package com.metamx.druid.indexer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.db.DbConnector; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.jackson.DefaultObjectMapper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import org.joda.time.DateTime; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.tweak.HandleCallback; -import java.io.IOException; import java.util.List; /** @@ -52,9 +46,6 @@ public class DbUpdaterJob implements Jobby private final DbUpdaterJobSpec spec; private final DBI dbi; - // Keep track of published segment identifiers, in case a client is interested. - private volatile ImmutableList publishedSegments = null; - public DbUpdaterJob( HadoopDruidIndexerConfig config ) @@ -67,75 +58,48 @@ public class DbUpdaterJob implements Jobby @Override public boolean run() { - final Configuration conf = new Configuration(); + final List segments = IndexGeneratorJob.getPublishedSegments(config); - ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + final PreparedBatch batch = handle.prepareBatch( + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + spec.getSegmentTable() + ) + ); + for (final DataSegment segment : segments) { - for (String propName : System.getProperties().stringPropertyNames()) { - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } + batch.add( + new ImmutableMap.Builder() + .put("id", segment.getIdentifier()) + .put("dataSource", segment.getDataSource()) + .put("created_date", new DateTime().toString()) + .put("start", segment.getInterval().getStart().toString()) + .put("end", segment.getInterval().getEnd().toString()) + .put("partitioned", segment.getShardSpec().getPartitionNum()) + .put("version", segment.getVersion()) + .put("used", true) + .put("payload", jsonMapper.writeValueAsString(segment)) + .build() + ); - final Path descriptorInfoDir = config.makeDescriptorInfoDir(); + log.info("Published %s", segment.getIdentifier()); - try { - FileSystem fs = descriptorInfoDir.getFileSystem(conf); - - for (FileStatus status : fs.listStatus(descriptorInfoDir)) { - final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class); - - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement(String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - spec.getSegmentTable() - )) - .bind("id", segment.getIdentifier()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", new DateTime().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", segment.getShardSpec().getPartitionNum()) - .bind("version", segment.getVersion()) - .bind("used", true) - .bind("payload", jsonMapper.writeValueAsString(segment)) - .execute(); - - return null; - } } - ); + batch.execute(); - publishedSegmentsBuilder.add(segment); - log.info("Published %s", segment.getIdentifier()); - } - } - catch (IOException e) { - throw Throwables.propagate(e); - } - - publishedSegments = publishedSegmentsBuilder.build(); + return null; + } + } + ); return true; } - /** - * Returns a list of segment identifiers published by the most recent call to run(). - * Throws an IllegalStateException if run() has never been called. - */ - public List getPublishedSegments() - { - if (publishedSegments == null) { - log.error("getPublishedSegments called before run!"); - throw new IllegalStateException("DbUpdaterJob has not run yet"); - } else { - return publishedSegments; - } - } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerAzkWrapper.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerAzkWrapper.java index 8d073dd7297..5318d5cb600 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerAzkWrapper.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerAzkWrapper.java @@ -83,7 +83,7 @@ public class HadoopDruidIndexerAzkWrapper final HadoopDruidIndexerConfig config = jsonMapper.convertValue(theMap, HadoopDruidIndexerConfig.class); config.setIntervals(dataInterval); - config.setVersion(new DateTime()); + config.setVersion(new DateTime().toString()); new HadoopDruidIndexerJob(config).run(); } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 979e2d989a4..5dc45d19484 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -175,7 +175,7 @@ public class HadoopDruidIndexerConfig private volatile PathSpec pathSpec; private volatile String jobOutputDir; private volatile String segmentOutputDir; - private volatile DateTime version = new DateTime(); + private volatile String version = new DateTime().toString(); private volatile PartitionsSpec partitionsSpec; private volatile boolean leaveIntermediate = false; private volatile boolean cleanupOnFailure = true; @@ -198,7 +198,7 @@ public class HadoopDruidIndexerConfig final @JsonProperty("pathSpec") PathSpec pathSpec, final @JsonProperty("workingPath") String jobOutputDir, final @JsonProperty("segmentOutputPath") String segmentOutputDir, - final @JsonProperty("version") DateTime version, + final @JsonProperty("version") String version, final @JsonProperty("partitionDimension") String partitionDimension, final @JsonProperty("targetPartitionSize") Long targetPartitionSize, final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, @@ -220,7 +220,7 @@ public class HadoopDruidIndexerConfig this.pathSpec = pathSpec; this.jobOutputDir = jobOutputDir; this.segmentOutputDir = segmentOutputDir; - this.version = version == null ? new DateTime() : version; + this.version = version == null ? new DateTime().toString() : version; this.partitionsSpec = partitionsSpec; this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = cleanupOnFailure; @@ -410,12 +410,12 @@ public class HadoopDruidIndexerConfig } @JsonProperty - public DateTime getVersion() + public String getVersion() { return version; } - public void setVersion(DateTime version) + public void setVersion(String version) { this.version = version; } @@ -624,7 +624,7 @@ public class HadoopDruidIndexerConfig */ public Path makeIntermediatePath() { - return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().toString().replace(":", ""))); + return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().replace(":", ""))); } public Path makeSegmentPartitionInfoPath(Bucket bucket) @@ -667,7 +667,7 @@ public class HadoopDruidIndexerConfig dataSource, bucketInterval.getStart().toString(), bucketInterval.getEnd().toString(), - getVersion().toString(), + getVersion(), bucket.partitionNum ) ); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerJob.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerJob.java index eae1bfa885d..aff1265782a 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerJob.java @@ -47,6 +47,7 @@ public class HadoopDruidIndexerJob implements Jobby private final HadoopDruidIndexerConfig config; private final DbUpdaterJob dbUpdaterJob; private IndexGeneratorJob indexJob; + private volatile List publishedSegments = null; public HadoopDruidIndexerJob( HadoopDruidIndexerConfig config @@ -102,6 +103,8 @@ public class HadoopDruidIndexerJob implements Jobby } } + publishedSegments = IndexGeneratorJob.getPublishedSegments(config); + if (!config.isLeaveIntermediate()) { if (failedMessage == null || config.isCleanupOnFailure()) { Path workingPath = config.makeIntermediatePath(); @@ -147,8 +150,10 @@ public class HadoopDruidIndexerJob implements Jobby } public List getPublishedSegments() { - Preconditions.checkState(dbUpdaterJob != null, "No updaterJobSpec set, cannot get published segments"); - return dbUpdaterJob.getPublishedSegments(); + if(publishedSegments == null) { + throw new IllegalStateException("Job hasn't run yet. No segments have been published yet."); + } + return publishedSegments; } public IndexGeneratorJob.IndexGeneratorStats getIndexJobStats() diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 0620ba2bc85..9359933b645 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -21,6 +21,7 @@ package com.metamx.druid.indexer; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -57,6 +58,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import com.fasterxml.jackson.databind.ObjectMapper; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -149,7 +151,40 @@ public class IndexGeneratorJob implements Jobby } } + public static List getPublishedSegments(HadoopDruidIndexerConfig config) { + + final Configuration conf = new Configuration(); + final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; + + ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); + + for (String propName : System.getProperties().stringPropertyNames()) { + if (propName.startsWith("hadoop.")) { + conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); + } + } + + final Path descriptorInfoDir = config.makeDescriptorInfoDir(); + + try { + FileSystem fs = descriptorInfoDir.getFileSystem(conf); + + for (FileStatus status : fs.listStatus(descriptorInfoDir)) { + final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class); + publishedSegmentsBuilder.add(segment); + log.info("Adding segment %s to the list of published segments", segment.getIdentifier()); + } + } + catch (IOException e) { + throw Throwables.propagate(e); + } + List publishedSegments = publishedSegmentsBuilder.build(); + + return publishedSegments; +} + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper + { @Override protected void innerMap( @@ -389,7 +424,7 @@ public class IndexGeneratorJob implements Jobby DataSegment segment = new DataSegment( config.getDataSource(), interval, - config.getVersion().toString(), + config.getVersion(), loadSpec, dimensionNames, metricNames, diff --git a/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java b/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java index 5b7609bd042..5918f0627c6 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java @@ -15,6 +15,9 @@ public abstract class TaskConfig @Default("500000") public abstract int getDefaultRowFlushBoundary(); + @Config("druid.merger.hadoopWorkingPath") + public abstract String getHadoopWorkingPath(); + public File getTaskDir(final Task task) { return new File(getBaseTaskDir(), task.getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java new file mode 100644 index 00000000000..29c0c517f17 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java @@ -0,0 +1,117 @@ +package com.metamx.druid.merger.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.indexer.HadoopDruidIndexerConfig; +import com.metamx.druid.indexer.HadoopDruidIndexerJob; +import com.metamx.druid.loading.S3DataSegmentPusher; +import com.metamx.druid.merger.common.TaskLock; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.actions.LockListAction; +import com.metamx.druid.merger.common.actions.SegmentInsertAction; +import com.metamx.druid.utils.JodaUtils; +import org.joda.time.DateTime; + +import java.util.List; + +public class HadoopIndexTask extends AbstractTask +{ + @JsonProperty + private final HadoopDruidIndexerConfig config; + + private static final Logger log = new Logger(HadoopIndexTask.class); + + /** + * @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters + * for creating Druid index segments. It may be modified. + *

+ * Here, we will ensure that the UpdaterJobSpec field of the config is set to null, such that the + * job does not push a list of published segments the database. Instead, we will use the method + * IndexGeneratorJob.getPublishedSegments() to simply return a list of the published + * segments, and let the indexing service report these segments to the database. + */ + + @JsonCreator + public HadoopIndexTask( + @JsonProperty("config") HadoopDruidIndexerConfig config + ) + { + super( + String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()), + config.getDataSource(), + JodaUtils.umbrellaInterval(config.getIntervals()) + ); + + // Some HadoopDruidIndexerConfig stuff doesn't make sense in the context of the indexing service + Preconditions.checkArgument(config.getSegmentOutputDir() == null, "segmentOutputPath must be absent"); + Preconditions.checkArgument(config.getJobOutputDir() == null, "workingPath must be absent"); + Preconditions.checkArgument(!config.isUpdaterJobSpecSet(), "updaterJobSpec must be absent"); + + this.config = config; + } + + @Override + public String getType() + { + return "index_hadoop"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + // Copy config so we don't needlessly modify our provided one + // Also necessary to make constructor validations work upon serde-after-run + final HadoopDruidIndexerConfig configCopy = toolbox.getObjectMapper() + .readValue( + toolbox.getObjectMapper().writeValueAsBytes(config), + HadoopDruidIndexerConfig.class + ); + + // We should have a lock from before we started running + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this))); + log.info("Setting version to: %s", myLock.getVersion()); + configCopy.setVersion(myLock.getVersion()); + + // Set workingPath to some reasonable default + configCopy.setJobOutputDir(toolbox.getConfig().getHadoopWorkingPath()); + + if (toolbox.getSegmentPusher() instanceof S3DataSegmentPusher) { + // Hack alert! Bypassing DataSegmentPusher... + S3DataSegmentPusher segmentPusher = (S3DataSegmentPusher) toolbox.getSegmentPusher(); + String s3Path = String.format( + "s3n://%s/%s/%s", + segmentPusher.getConfig().getBucket(), + segmentPusher.getConfig().getBaseKey(), + getDataSource() + ); + + log.info("Setting segment output path to: %s", s3Path); + configCopy.setSegmentOutputDir(s3Path); + } else { + throw new IllegalStateException("Sorry, we only work with S3DataSegmentPushers! Bummer!"); + } + + HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(configCopy); + configCopy.verify(); + + log.info("Starting a hadoop index generator job..."); + if (job.run()) { + List publishedSegments = job.getPublishedSegments(); + + // Request segment pushes + toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.copyOf(publishedSegments))); + + // Done + return TaskStatus.success(getId()); + } else { + return TaskStatus.failure(getId()); + } + + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index 5ac4dcf71df..60a265564da 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -37,7 +37,8 @@ import org.joda.time.Interval; @JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class), - @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class) + @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class), + @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class) }) public interface Task { diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java index 51310bb2ef0..213bec34421 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java @@ -1,12 +1,17 @@ package com.metamx.druid.merger.common.task; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.metamx.common.Granularity; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.indexer.HadoopDruidIndexerConfig; +import com.metamx.druid.indexer.data.JSONDataSpec; import com.metamx.druid.indexer.granularity.UniformGranularitySpec; +import com.metamx.druid.indexer.path.StaticPathSpec; +import com.metamx.druid.indexer.rollup.DataRollupSpec; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.realtime.Schema; import com.metamx.druid.shard.NoneShardSpec; @@ -32,13 +37,17 @@ public class TaskSerdeTest final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final Task task2 = jsonMapper.readValue(json, Task.class); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P2D")), task.getFixedInterval()); + Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); - Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); } @Test @@ -61,11 +70,13 @@ public class TaskSerdeTest final String json = jsonMapper.writeValueAsString(task); final Task task2 = jsonMapper.readValue(json, Task.class); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval()); + Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); - Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); } @Test @@ -82,11 +93,13 @@ public class TaskSerdeTest final String json = jsonMapper.writeValueAsString(task); final Task task2 = jsonMapper.readValue(json, Task.class); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval()); + Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); - Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); } @Test @@ -99,7 +112,6 @@ public class TaskSerdeTest final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); - System.out.println(json); final Task task2 = jsonMapper.readValue(json, Task.class); Assert.assertEquals(task.getId(), task2.getId()); @@ -108,4 +120,47 @@ public class TaskSerdeTest Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); Assert.assertEquals(task.getFixedInterval().get(), task2.getFixedInterval().get()); } + + @Test + public void testHadoopIndexTaskSerde() throws Exception + { + final HadoopIndexTask task = new HadoopIndexTask( + new HadoopDruidIndexerConfig( + null, + "foo", + "timestamp", + "auto", + new JSONDataSpec(ImmutableList.of("foo")), + null, + new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), + new StaticPathSpec("bar"), + null, + null, + null, + null, + null, + null, + false, + true, + null, + false, + new DataRollupSpec(ImmutableList.of(), QueryGranularity.NONE), + null, + false, + ImmutableList.of() + ) + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getFixedInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getFixedInterval(), task2.getFixedInterval()); + } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index 60309f1e368..6d48ce17df9 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -300,6 +300,12 @@ public class RemoteTaskRunnerTest { return 0; } + + @Override + public String getHadoopWorkingPath() + { + return null; + } }, null, null, null, null, null, jsonMapper ), Executors.newSingleThreadExecutor() diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index 69a364e1900..ae5b46fdfc1 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -110,6 +110,12 @@ public class TaskLifecycleTest { return 50000; } + + @Override + public String getHadoopWorkingPath() + { + return null; + } }, new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())), newMockEmitter(), diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java index 273a07d36f3..dd5ce951695 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java @@ -57,6 +57,11 @@ public class S3DataSegmentPusher implements DataSegmentPusher this.jsonMapper = jsonMapper; } + public S3DataSegmentPusherConfig getConfig() + { + return config; + } + @Override public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException {