Merge pull request #2431 from himanshug/job_helper_refactor

In JobHelper.makeSegmentOutputPath(..) use DataSegmentPusherUtils
This commit is contained in:
Fangjin Yang 2016-02-10 14:42:08 -08:00
commit 2b35e90985
4 changed files with 44 additions and 56 deletions

View File

@ -75,7 +75,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@ -689,18 +688,19 @@ public class IndexGeneratorJob implements Jobby
}
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
.getFileSystem(context.getConfiguration());
final DataSegment segmentTemplate = new DataSegment(
config.getDataSource(),
interval,
config.getSchema().getTuningConfig().getVersion(),
null,
ImmutableList.copyOf(allDimensionNames),
metricNames,
config.getShardSpec(bucket).getActualSpec(),
-1,
-1
);
final DataSegment segment = JobHelper.serializeOutIndex(
new DataSegment(
config.getDataSource(),
interval,
config.getSchema().getTuningConfig().getVersion(),
null,
ImmutableList.copyOf(allDimensionNames),
metricNames,
config.getShardSpec(bucket).getActualSpec(),
-1,
-1
),
segmentTemplate,
context.getConfiguration(),
context,
context.getTaskAttemptID(),
@ -708,10 +708,7 @@ public class IndexGeneratorJob implements Jobby
JobHelper.makeSegmentOutputPath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
config.getSchema().getDataSchema().getDataSource(),
config.getSchema().getTuningConfig().getVersion(),
config.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum
segmentTemplate
)
);

View File

@ -35,6 +35,7 @@ import com.metamx.common.logger.Logger;
import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -48,8 +49,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.BufferedOutputStream;
import java.io.File;
@ -545,34 +544,13 @@ public class JobHelper
public static Path makeSegmentOutputPath(
Path basePath,
FileSystem fileSystem,
String dataSource,
String version,
Interval interval,
int partitionNum
DataSegment segment
)
{
Path outputPath = new Path(prependFSIfNullScheme(fileSystem, basePath), "./" + dataSource);
if ("hdfs".equals(fileSystem.getScheme())) {
outputPath = new Path(
outputPath, String.format(
"./%s_%s",
interval.getStart().toString(ISODateTimeFormat.basicDateTime()),
interval.getEnd().toString(ISODateTimeFormat.basicDateTime())
)
);
outputPath = new Path(outputPath, version.replace(":", "_"));
} else {
outputPath = new Path(
outputPath, String.format(
"./%s_%s",
interval.getStart().toString(),
interval.getEnd().toString()
)
);
outputPath = new Path(outputPath, String.format("./%s", version));
}
outputPath = new Path(outputPath, Integer.toString(partitionNum));
return outputPath;
String segmentDir = "hdfs".equals(fileSystem.getScheme())
? DataSegmentPusherUtil.getHdfsStorageDir(segment)
: DataSegmentPusherUtil.getStorageDir(segment);
return new Path(prependFSIfNullScheme(fileSystem, basePath), String.format("./%s", segmentDir));
}
/**

View File

@ -539,10 +539,7 @@ public class HadoopConverterJob
JobHelper.makeSegmentOutputPath(
baseOutputPath,
outputFS,
finalSegmentTemplate.getDataSource(),
finalSegmentTemplate.getVersion(),
finalSegmentTemplate.getInterval(),
finalSegmentTemplate.getShardSpec().getPartitionNum()
finalSegmentTemplate
)
);
context.progress();

View File

@ -32,7 +32,9 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -106,10 +108,17 @@ public class HadoopDruidIndexerConfigTest
Path path = JobHelper.makeSegmentOutputPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getTuningConfig().getVersion(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
)
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712",
@ -159,10 +168,17 @@ public class HadoopDruidIndexerConfigTest
Path path = JobHelper.makeSegmentOutputPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getTuningConfig().getVersion(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
)
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712",