In JobHelper.makeSegmentOutputPath(..) use DataSegmentPusherUtils to construct the segment storage path

This commit is contained in:
Himanshu Gupta 2016-02-07 23:24:46 -06:00
parent 69a6bdcf03
commit 2faae9d0d1
4 changed files with 44 additions and 56 deletions

View File

@ -75,7 +75,6 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -689,18 +688,19 @@ public class IndexGeneratorJob implements Jobby
} }
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()) final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
.getFileSystem(context.getConfiguration()); .getFileSystem(context.getConfiguration());
final DataSegment segmentTemplate = new DataSegment(
config.getDataSource(),
interval,
config.getSchema().getTuningConfig().getVersion(),
null,
ImmutableList.copyOf(allDimensionNames),
metricNames,
config.getShardSpec(bucket).getActualSpec(),
-1,
-1
);
final DataSegment segment = JobHelper.serializeOutIndex( final DataSegment segment = JobHelper.serializeOutIndex(
new DataSegment( segmentTemplate,
config.getDataSource(),
interval,
config.getSchema().getTuningConfig().getVersion(),
null,
ImmutableList.copyOf(allDimensionNames),
metricNames,
config.getShardSpec(bucket).getActualSpec(),
-1,
-1
),
context.getConfiguration(), context.getConfiguration(),
context, context,
context.getTaskAttemptID(), context.getTaskAttemptID(),
@ -708,10 +708,7 @@ public class IndexGeneratorJob implements Jobby
JobHelper.makeSegmentOutputPath( JobHelper.makeSegmentOutputPath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS, outputFS,
config.getSchema().getDataSchema().getDataSource(), segmentTemplate
config.getSchema().getTuningConfig().getVersion(),
config.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum
) )
); );

View File

@ -35,6 +35,7 @@ import com.metamx.common.logger.Logger;
import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.segment.ProgressIndicator; import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils; import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -48,8 +49,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
@ -545,34 +544,13 @@ public class JobHelper
public static Path makeSegmentOutputPath( public static Path makeSegmentOutputPath(
Path basePath, Path basePath,
FileSystem fileSystem, FileSystem fileSystem,
String dataSource, DataSegment segment
String version,
Interval interval,
int partitionNum
) )
{ {
Path outputPath = new Path(prependFSIfNullScheme(fileSystem, basePath), "./" + dataSource); String segmentDir = "hdfs".equals(fileSystem.getScheme())
if ("hdfs".equals(fileSystem.getScheme())) { ? DataSegmentPusherUtil.getHdfsStorageDir(segment)
outputPath = new Path( : DataSegmentPusherUtil.getStorageDir(segment);
outputPath, String.format( return new Path(prependFSIfNullScheme(fileSystem, basePath), String.format("./%s", segmentDir));
"./%s_%s",
interval.getStart().toString(ISODateTimeFormat.basicDateTime()),
interval.getEnd().toString(ISODateTimeFormat.basicDateTime())
)
);
outputPath = new Path(outputPath, version.replace(":", "_"));
} else {
outputPath = new Path(
outputPath, String.format(
"./%s_%s",
interval.getStart().toString(),
interval.getEnd().toString()
)
);
outputPath = new Path(outputPath, String.format("./%s", version));
}
outputPath = new Path(outputPath, Integer.toString(partitionNum));
return outputPath;
} }
/** /**

View File

@ -539,10 +539,7 @@ public class HadoopConverterJob
JobHelper.makeSegmentOutputPath( JobHelper.makeSegmentOutputPath(
baseOutputPath, baseOutputPath,
outputFS, outputFS,
finalSegmentTemplate.getDataSource(), finalSegmentTemplate
finalSegmentTemplate.getVersion(),
finalSegmentTemplate.getInterval(),
finalSegmentTemplate.getShardSpec().getPartitionNum()
) )
); );
context.progress(); context.progress();

View File

@ -32,7 +32,9 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -106,10 +108,17 @@ public class HadoopDruidIndexerConfigTest
Path path = JobHelper.makeSegmentOutputPath( Path path = JobHelper.makeSegmentOutputPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(), new DistributedFileSystem(),
cfg.getSchema().getDataSchema().getDataSource(), new DataSegment(
cfg.getSchema().getTuningConfig().getVersion(), cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
)
); );
Assert.assertEquals( Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712",
@ -159,10 +168,17 @@ public class HadoopDruidIndexerConfigTest
Path path = JobHelper.makeSegmentOutputPath( Path path = JobHelper.makeSegmentOutputPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(), new LocalFileSystem(),
cfg.getSchema().getDataSchema().getDataSource(), new DataSegment(
cfg.getSchema().getTuningConfig().getVersion(), cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
bucket.partitionNum cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
)
); );
Assert.assertEquals( Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712",