From 017d4779d684e12917f73cb9fd323b5cc7cbc490 Mon Sep 17 00:00:00 2001 From: Deep Ganguli Date: Wed, 23 Jan 2013 19:27:14 -0800 Subject: [PATCH] Implemented Hadoop Index Task which takes as input a HadoopDruidIndexConfig and generates index segments. The HadoopIndexTask run method wraps a HadoopDruidIndexerJob run method. The key modifications to the HadoopDruidIndexerJob are as follows: - The UpDaterJobSpec field of the config that is used to set up the indexer job is set to null. This ensures that the job does not push a list of published segments to the database, in order to allow the indexing service to handle this later. - Set the version field of the config file based on the TaskContext. Also changed config.setVersion method to take a string (as opposed to a Date) as input, and propogated this change where necessary. - Set the SegmentOutputDir field of the config file based on the TaskToolbox, to allow the indexing service to handle where to write the segments too. - Added a method to IndexGeneratorJob called getPublishedSegments, that simply returns a list of published segments without publishing this list to the database. --- .../metamx/druid/indexer/DbUpdaterJob.java | 100 +++++------------- .../indexer/HadoopDruidIndexerAzkWrapper.java | 2 +- .../indexer/HadoopDruidIndexerConfig.java | 10 +- .../druid/indexer/HadoopDruidIndexerJob.java | 9 +- .../druid/indexer/IndexGeneratorJob.java | 39 ++++++- .../merger/common/task/HadoopIndexTask.java | 91 ++++++++++++++++ .../metamx/druid/merger/common/task/Task.java | 9 +- .../metamx/druid/loading/S3SegmentPusher.java | 5 + 8 files changed, 183 insertions(+), 82 deletions(-) create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java index 720242466e4..530042d289c 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DbUpdaterJob.java @@ -19,24 +19,18 @@ package com.metamx.druid.indexer; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; + import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.db.DbConnector; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.jackson.DefaultObjectMapper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; -import java.io.IOException; import java.util.List; /** @@ -51,9 +45,6 @@ public class DbUpdaterJob implements Jobby private final DbUpdaterJobSpec spec; private final DBI dbi; - // Keep track of published segment identifiers, in case a client is interested. - private volatile ImmutableList publishedSegments = null; - public DbUpdaterJob( HadoopDruidIndexerConfig config ) @@ -66,75 +57,42 @@ public class DbUpdaterJob implements Jobby @Override public boolean run() { - final Configuration conf = new Configuration(); + final List segments = IndexGeneratorJob.getPublishedSegments(config); - ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); + for (final DataSegment segment : segments) { - for (String propName : System.getProperties().stringPropertyNames()) { - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } - - final Path descriptorInfoDir = config.makeDescriptorInfoDir(); - - try { - FileSystem fs = descriptorInfoDir.getFileSystem(conf); - - for (FileStatus status : fs.listStatus(descriptorInfoDir)) { - final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class); - - dbi.withHandle( - new HandleCallback() + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement(String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - spec.getSegmentTable() - )) - .bind("id", segment.getIdentifier()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", new DateTime().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", segment.getShardSpec().getPartitionNum()) - .bind("version", segment.getVersion()) - .bind("used", true) - .bind("payload", jsonMapper.writeValueAsString(segment)) - .execute(); + handle.createStatement( + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + spec.getSegmentTable() + ) + ) + .bind("id", segment.getIdentifier()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", new DateTime().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", segment.getShardSpec().getPartitionNum()) + .bind("version", segment.getVersion()) + .bind("used", true) + .bind("payload", jsonMapper.writeValueAsString(segment)) + .execute(); - return null; - } + return null; } - ); + } + ); - publishedSegmentsBuilder.add(segment); - log.info("Published %s", segment.getIdentifier()); - } + log.info("Published %s", segment.getIdentifier()); } - catch (IOException e) { - throw Throwables.propagate(e); - } - - publishedSegments = publishedSegmentsBuilder.build(); - return true; } - /** - * Returns a list of segment identifiers published by the most recent call to run(). - * Throws an IllegalStateException if run() has never been called. - */ - public List getPublishedSegments() - { - if (publishedSegments == null) { - log.error("getPublishedSegments called before run!"); - throw new IllegalStateException("DbUpdaterJob has not run yet"); - } else { - return publishedSegments; - } - } } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerAzkWrapper.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerAzkWrapper.java index 8d073dd7297..5318d5cb600 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerAzkWrapper.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerAzkWrapper.java @@ -83,7 +83,7 @@ public class HadoopDruidIndexerAzkWrapper final HadoopDruidIndexerConfig config = jsonMapper.convertValue(theMap, HadoopDruidIndexerConfig.class); config.setIntervals(dataInterval); - config.setVersion(new DateTime()); + config.setVersion(new DateTime().toString()); new HadoopDruidIndexerJob(config).run(); } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 403484b9c61..7e91cba9de5 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -174,7 +174,7 @@ public class HadoopDruidIndexerConfig private volatile PathSpec pathSpec; private volatile String jobOutputDir; private volatile String segmentOutputDir; - private volatile DateTime version = new DateTime(); + private volatile String version = new DateTime().toString(); private volatile String partitionDimension; private volatile Long targetPartitionSize; private volatile boolean leaveIntermediate = false; @@ -312,12 +312,12 @@ public class HadoopDruidIndexerConfig } @JsonProperty - public DateTime getVersion() + public String getVersion() { return version; } - public void setVersion(DateTime version) + public void setVersion(String version) { this.version = version; } @@ -544,7 +544,7 @@ public class HadoopDruidIndexerConfig */ public Path makeIntermediatePath() { - return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().toString().replace(":", ""))); + return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().replace(":", ""))); } public Path makeSegmentPartitionInfoPath(Bucket bucket) @@ -581,7 +581,7 @@ public class HadoopDruidIndexerConfig getSegmentOutputDir(), bucketInterval.getStart().toString(), bucketInterval.getEnd().toString(), - getVersion().toString(), + getVersion(), bucket.partitionNum ) ); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerJob.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerJob.java index eae1bfa885d..aff1265782a 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerJob.java @@ -47,6 +47,7 @@ public class HadoopDruidIndexerJob implements Jobby private final HadoopDruidIndexerConfig config; private final DbUpdaterJob dbUpdaterJob; private IndexGeneratorJob indexJob; + private volatile List publishedSegments = null; public HadoopDruidIndexerJob( HadoopDruidIndexerConfig config @@ -102,6 +103,8 @@ public class HadoopDruidIndexerJob implements Jobby } } + publishedSegments = IndexGeneratorJob.getPublishedSegments(config); + if (!config.isLeaveIntermediate()) { if (failedMessage == null || config.isCleanupOnFailure()) { Path workingPath = config.makeIntermediatePath(); @@ -147,8 +150,10 @@ public class HadoopDruidIndexerJob implements Jobby } public List getPublishedSegments() { - Preconditions.checkState(dbUpdaterJob != null, "No updaterJobSpec set, cannot get published segments"); - return dbUpdaterJob.getPublishedSegments(); + if(publishedSegments == null) { + throw new IllegalStateException("Job hasn't run yet. No segments have been published yet."); + } + return publishedSegments; } public IndexGeneratorJob.IndexGeneratorStats getIndexJobStats() diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 28dacd1ca9a..d9188586041 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -44,6 +45,7 @@ import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.indexer.rollup.DataRollupSpec; import com.metamx.druid.input.MapBasedInputRow; +import com.metamx.druid.jackson.DefaultObjectMapper; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -65,8 +67,11 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.tweak.HandleCallback; import javax.annotation.Nullable; import java.io.BufferedOutputStream; @@ -159,6 +164,38 @@ public class IndexGeneratorJob implements Jobby } } + public static List getPublishedSegments(HadoopDruidIndexerConfig config) { + + final Configuration conf = new Configuration(); + final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; + + ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); + + for (String propName : System.getProperties().stringPropertyNames()) { + if (propName.startsWith("hadoop.")) { + conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); + } + } + + final Path descriptorInfoDir = config.makeDescriptorInfoDir(); + + try { + FileSystem fs = descriptorInfoDir.getFileSystem(conf); + + for (FileStatus status : fs.listStatus(descriptorInfoDir)) { + final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class); + publishedSegmentsBuilder.add(segment); + log.info("Published %s", segment.getIdentifier()); + } + } + catch (IOException e) { + throw Throwables.propagate(e); + } + List publishedSegments = publishedSegmentsBuilder.build(); + + return publishedSegments; +} + public static class IndexGeneratorMapper extends Mapper { private HadoopDruidIndexerConfig config; @@ -467,7 +504,7 @@ public class IndexGeneratorJob implements Jobby DataSegment segment = new DataSegment( config.getDataSource(), interval, - config.getVersion().toString(), + config.getVersion(), loadSpec, dimensionNames, metricNames, diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java new file mode 100644 index 00000000000..9dda89d4371 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java @@ -0,0 +1,91 @@ +package com.metamx.druid.merger.common.task; + +import com.google.common.collect.ImmutableList; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.indexer.HadoopDruidIndexerConfig; +import com.metamx.druid.indexer.HadoopDruidIndexerJob; +import com.metamx.druid.loading.S3SegmentPusher; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.coordinator.TaskContext; +import com.metamx.druid.utils.JodaUtils; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; +import org.joda.time.DateTime; + +import java.util.List; + +public class HadoopIndexTask extends AbstractTask +{ + @JsonProperty + private static final Logger log = new Logger(HadoopIndexTask.class); + + @JsonProperty + private final HadoopDruidIndexerConfig config; + + /** + * @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters + * for creating Druid index segments. It may be modified. Here, we will ensure that the + * UpDaterJobSpec field of the config is set to null, such that the job does not push a + * list of published segments the database. Instead, we will use the method + * IndexGeneratorJob.getPublishedSegments() to simply return a list of the published + * segments, and let the indexing service report these segments to the database. + */ + + @JsonCreator + public HadoopIndexTask(@JsonProperty("config") HadoopDruidIndexerConfig config) + { + super( + String.format("index_hadoop_%s_interval_%s", config.getDataSource(), new DateTime().now()), + config.getDataSource(), + JodaUtils.umbrellaInterval(config.getIntervals()) + ); + + if (config.isUpdaterJobSpecSet()) { + throw new IllegalArgumentException("UpDaterJobSpec is defined"); + } + this.config = config; + } + + @Override + public Type getType() + { + return Type.INDEX; + } + + @Override + public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + { + log.info("Setting version to: %s", context.getVersion()); + config.setVersion(context.getVersion()); + + if(toolbox.getSegmentPusher() instanceof S3SegmentPusher) { + // Hack alert! Bypassing SegmentPusher... + S3SegmentPusher segmentPusher = (S3SegmentPusher) toolbox.getSegmentPusher(); + String s3Path = String.format( + "s3://%s/%s/%s", + segmentPusher.getConfig().getBucket(), + segmentPusher.getConfig().getBaseKey(), + getDataSource() + ); + + log.info("Setting segment output path to: %s", s3Path); + config.setSegmentOutputDir(s3Path); + } else { + throw new IllegalStateException("Sorry, we only work with S3SegmentPushers! Bummer!"); + } + + HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config); + log.debug("Starting a hadoop index generator job..."); + + if (job.run()) { + List publishedSegments = job.getPublishedSegments(); + return TaskStatus.success(getId(), ImmutableList.copyOf(publishedSegments)); + + } else { + return TaskStatus.failure(getId()); + } + + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index b2059210b58..4fbdd9de2a0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -37,6 +37,7 @@ import org.joda.time.Interval; @JsonSubTypes(value = { @JsonSubTypes.Type(name = "append", value = AppendTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class), + @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class), @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class) @@ -64,12 +65,14 @@ public interface Task /** * Execute preflight checks for a task. This typically runs on the coordinator, and will be run while - * holding a lock on our dataSouce and interval. If this method throws an exception, the task should be + * holding a lock on our dataSource and interval. If this method throws an exception, the task should be * considered a failure. * * @param context Context for this task, gathered under indexer lock + * * @return Some kind of status (runnable means continue on to a worker, non-runnable means we completed without - * using a worker). + * using a worker). + * * @throws Exception */ public TaskStatus preflight(TaskContext context) throws Exception; @@ -81,7 +84,9 @@ public interface Task * * @param context Context for this task, gathered under indexer lock * @param toolbox Toolbox for this task + * * @return Some kind of finished status (isRunnable must be false). + * * @throws Exception */ public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception; diff --git a/server/src/main/java/com/metamx/druid/loading/S3SegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/S3SegmentPusher.java index f4099154d2d..1475e4dfe68 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3SegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/S3SegmentPusher.java @@ -62,6 +62,11 @@ public class S3SegmentPusher implements SegmentPusher this.jsonMapper = jsonMapper; } + public S3SegmentPusherConfig getConfig() + { + return config; + } + @Override public DataSegment push(File file, DataSegment segment) throws IOException {