Implemented Hadoop Index Task which takes as input a HadoopDruidIndexConfig and

generates index segments.

The HadoopIndexTask run method wraps a HadoopDruidIndexerJob run method. The
key modifications to the HadoopDruidIndexerJob are as follows:

- The UpDaterJobSpec field of the config that is used to set up the indexer job
  is set to null. This ensures that the job does not push a list of published
segments to the database, in order to allow the indexing service to handle this
later.
- Set the version field of the config file based on the TaskContext. Also
  changed config.setVersion method to take a string (as opposed to a Date) as
input, and propogated this change where necessary.
- Set the SegmentOutputDir field of the config file based on the TaskToolbox,
  to allow the indexing service to handle where to write the segments too.
- Added a method to IndexGeneratorJob called getPublishedSegments, that simply
  returns a list of published segments without publishing this list to the
database.
This commit is contained in:
Deep Ganguli 2013-01-23 19:27:14 -08:00
parent fc07bc315e
commit 017d4779d6
8 changed files with 183 additions and 82 deletions

View File

@ -19,24 +19,18 @@
package com.metamx.druid.indexer;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
import java.util.List;
/**
@ -51,9 +45,6 @@ public class DbUpdaterJob implements Jobby
private final DbUpdaterJobSpec spec;
private final DBI dbi;
// Keep track of published segment identifiers, in case a client is interested.
private volatile ImmutableList<DataSegment> publishedSegments = null;
public DbUpdaterJob(
HadoopDruidIndexerConfig config
)
@ -66,75 +57,42 @@ public class DbUpdaterJob implements Jobby
@Override
public boolean run()
{
final Configuration conf = new Configuration();
final List<DataSegment> segments = IndexGeneratorJob.getPublishedSegments(config);
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (final DataSegment segment : segments) {
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
dbi.withHandle(
new HandleCallback<Void>()
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
spec.getSegmentTable()
))
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", segment.getShardSpec().getPartitionNum())
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
spec.getSegmentTable()
)
)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", segment.getShardSpec().getPartitionNum())
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
return null;
}
return null;
}
);
}
);
publishedSegmentsBuilder.add(segment);
log.info("Published %s", segment.getIdentifier());
}
log.info("Published %s", segment.getIdentifier());
}
catch (IOException e) {
throw Throwables.propagate(e);
}
publishedSegments = publishedSegmentsBuilder.build();
return true;
}
/**
* Returns a list of segment identifiers published by the most recent call to run().
* Throws an IllegalStateException if run() has never been called.
*/
public List<DataSegment> getPublishedSegments()
{
if (publishedSegments == null) {
log.error("getPublishedSegments called before run!");
throw new IllegalStateException("DbUpdaterJob has not run yet");
} else {
return publishedSegments;
}
}
}

View File

@ -83,7 +83,7 @@ public class HadoopDruidIndexerAzkWrapper
final HadoopDruidIndexerConfig config = jsonMapper.convertValue(theMap, HadoopDruidIndexerConfig.class);
config.setIntervals(dataInterval);
config.setVersion(new DateTime());
config.setVersion(new DateTime().toString());
new HadoopDruidIndexerJob(config).run();
}

View File

@ -174,7 +174,7 @@ public class HadoopDruidIndexerConfig
private volatile PathSpec pathSpec;
private volatile String jobOutputDir;
private volatile String segmentOutputDir;
private volatile DateTime version = new DateTime();
private volatile String version = new DateTime().toString();
private volatile String partitionDimension;
private volatile Long targetPartitionSize;
private volatile boolean leaveIntermediate = false;
@ -312,12 +312,12 @@ public class HadoopDruidIndexerConfig
}
@JsonProperty
public DateTime getVersion()
public String getVersion()
{
return version;
}
public void setVersion(DateTime version)
public void setVersion(String version)
{
this.version = version;
}
@ -544,7 +544,7 @@ public class HadoopDruidIndexerConfig
*/
public Path makeIntermediatePath()
{
return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().toString().replace(":", "")));
return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().replace(":", "")));
}
public Path makeSegmentPartitionInfoPath(Bucket bucket)
@ -581,7 +581,7 @@ public class HadoopDruidIndexerConfig
getSegmentOutputDir(),
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion().toString(),
getVersion(),
bucket.partitionNum
)
);

View File

@ -47,6 +47,7 @@ public class HadoopDruidIndexerJob implements Jobby
private final HadoopDruidIndexerConfig config;
private final DbUpdaterJob dbUpdaterJob;
private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null;
public HadoopDruidIndexerJob(
HadoopDruidIndexerConfig config
@ -102,6 +103,8 @@ public class HadoopDruidIndexerJob implements Jobby
}
}
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
if (!config.isLeaveIntermediate()) {
if (failedMessage == null || config.isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
@ -147,8 +150,10 @@ public class HadoopDruidIndexerJob implements Jobby
}
public List<DataSegment> getPublishedSegments() {
Preconditions.checkState(dbUpdaterJob != null, "No updaterJobSpec set, cannot get published segments");
return dbUpdaterJob.getPublishedSegments();
if(publishedSegments == null) {
throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
}
return publishedSegments;
}
public IndexGeneratorJob.IndexGeneratorStats getIndexJobStats()

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -44,6 +45,7 @@ import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -65,8 +67,11 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.annotation.Nullable;
import java.io.BufferedOutputStream;
@ -159,6 +164,38 @@ public class IndexGeneratorJob implements Jobby
}
}
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) {
final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
final Path descriptorInfoDir = config.makeDescriptorInfoDir();
try {
FileSystem fs = descriptorInfoDir.getFileSystem(conf);
for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class);
publishedSegmentsBuilder.add(segment);
log.info("Published %s", segment.getIdentifier());
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
return publishedSegments;
}
public static class IndexGeneratorMapper extends Mapper<LongWritable, Text, BytesWritable, Text>
{
private HadoopDruidIndexerConfig config;
@ -467,7 +504,7 @@ public class IndexGeneratorJob implements Jobby
DataSegment segment = new DataSegment(
config.getDataSource(),
interval,
config.getVersion().toString(),
config.getVersion(),
loadSpec,
dimensionNames,
metricNames,

View File

@ -0,0 +1,91 @@
package com.metamx.druid.merger.common.task;
import com.google.common.collect.ImmutableList;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.HadoopDruidIndexerConfig;
import com.metamx.druid.indexer.HadoopDruidIndexerJob;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.coordinator.TaskContext;
import com.metamx.druid.utils.JodaUtils;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.DateTime;
import java.util.List;
public class HadoopIndexTask extends AbstractTask
{
@JsonProperty
private static final Logger log = new Logger(HadoopIndexTask.class);
@JsonProperty
private final HadoopDruidIndexerConfig config;
/**
* @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified. Here, we will ensure that the
* UpDaterJobSpec field of the config is set to null, such that the job does not push a
* list of published segments the database. Instead, we will use the method
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
* segments, and let the indexing service report these segments to the database.
*/
@JsonCreator
public HadoopIndexTask(@JsonProperty("config") HadoopDruidIndexerConfig config)
{
super(
String.format("index_hadoop_%s_interval_%s", config.getDataSource(), new DateTime().now()),
config.getDataSource(),
JodaUtils.umbrellaInterval(config.getIntervals())
);
if (config.isUpdaterJobSpecSet()) {
throw new IllegalArgumentException("UpDaterJobSpec is defined");
}
this.config = config;
}
@Override
public Type getType()
{
return Type.INDEX;
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
{
log.info("Setting version to: %s", context.getVersion());
config.setVersion(context.getVersion());
if(toolbox.getSegmentPusher() instanceof S3SegmentPusher) {
// Hack alert! Bypassing SegmentPusher...
S3SegmentPusher segmentPusher = (S3SegmentPusher) toolbox.getSegmentPusher();
String s3Path = String.format(
"s3://%s/%s/%s",
segmentPusher.getConfig().getBucket(),
segmentPusher.getConfig().getBaseKey(),
getDataSource()
);
log.info("Setting segment output path to: %s", s3Path);
config.setSegmentOutputDir(s3Path);
} else {
throw new IllegalStateException("Sorry, we only work with S3SegmentPushers! Bummer!");
}
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
log.debug("Starting a hadoop index generator job...");
if (job.run()) {
List<DataSegment> publishedSegments = job.getPublishedSegments();
return TaskStatus.success(getId(), ImmutableList.copyOf(publishedSegments));
} else {
return TaskStatus.failure(getId());
}
}
}

View File

@ -37,6 +37,7 @@ import org.joda.time.Interval;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class),
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class)
@ -64,12 +65,14 @@ public interface Task
/**
* Execute preflight checks for a task. This typically runs on the coordinator, and will be run while
* holding a lock on our dataSouce and interval. If this method throws an exception, the task should be
* holding a lock on our dataSource and interval. If this method throws an exception, the task should be
* considered a failure.
*
* @param context Context for this task, gathered under indexer lock
*
* @return Some kind of status (runnable means continue on to a worker, non-runnable means we completed without
* using a worker).
* using a worker).
*
* @throws Exception
*/
public TaskStatus preflight(TaskContext context) throws Exception;
@ -81,7 +84,9 @@ public interface Task
*
* @param context Context for this task, gathered under indexer lock
* @param toolbox Toolbox for this task
*
* @return Some kind of finished status (isRunnable must be false).
*
* @throws Exception
*/
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception;

View File

@ -62,6 +62,11 @@ public class S3SegmentPusher implements SegmentPusher
this.jsonMapper = jsonMapper;
}
public S3SegmentPusherConfig getConfig()
{
return config;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{