diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index a7c65ab4ba9..c69c705cff8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -75,7 +75,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -689,18 +688,19 @@ public class IndexGeneratorJob implements Jobby } final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()) .getFileSystem(context.getConfiguration()); + final DataSegment segmentTemplate = new DataSegment( + config.getDataSource(), + interval, + config.getSchema().getTuningConfig().getVersion(), + null, + ImmutableList.copyOf(allDimensionNames), + metricNames, + config.getShardSpec(bucket).getActualSpec(), + -1, + -1 + ); final DataSegment segment = JobHelper.serializeOutIndex( - new DataSegment( - config.getDataSource(), - interval, - config.getSchema().getTuningConfig().getVersion(), - null, - ImmutableList.copyOf(allDimensionNames), - metricNames, - config.getShardSpec(bucket).getActualSpec(), - -1, - -1 - ), + segmentTemplate, context.getConfiguration(), context, context.getTaskAttemptID(), @@ -708,10 +708,7 @@ public class IndexGeneratorJob implements Jobby JobHelper.makeSegmentOutputPath( new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, - config.getSchema().getDataSchema().getDataSource(), - config.getSchema().getTuningConfig().getVersion(), - config.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - bucket.partitionNum + segmentTemplate ) ); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 882baa0b7a1..0c3912d4120 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -35,6 +35,7 @@ import com.metamx.common.logger.Logger; import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; +import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -48,8 +49,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progressable; import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.joda.time.format.ISODateTimeFormat; import java.io.BufferedOutputStream; import java.io.File; @@ -545,34 +544,13 @@ public class JobHelper public static Path makeSegmentOutputPath( Path basePath, FileSystem fileSystem, - String dataSource, - String version, - Interval interval, - int partitionNum + DataSegment segment ) { - Path outputPath = new Path(prependFSIfNullScheme(fileSystem, basePath), "./" + dataSource); - if ("hdfs".equals(fileSystem.getScheme())) { - outputPath = new Path( - outputPath, String.format( - "./%s_%s", - interval.getStart().toString(ISODateTimeFormat.basicDateTime()), - interval.getEnd().toString(ISODateTimeFormat.basicDateTime()) - ) - ); - outputPath = new Path(outputPath, version.replace(":", "_")); - } else { - outputPath = new Path( - outputPath, String.format( - "./%s_%s", - interval.getStart().toString(), - interval.getEnd().toString() - ) - ); - outputPath = new Path(outputPath, String.format("./%s", version)); - } - outputPath = new Path(outputPath, Integer.toString(partitionNum)); - return outputPath; + String segmentDir = "hdfs".equals(fileSystem.getScheme()) + ? DataSegmentPusherUtil.getHdfsStorageDir(segment) + : DataSegmentPusherUtil.getStorageDir(segment); + return new Path(prependFSIfNullScheme(fileSystem, basePath), String.format("./%s", segmentDir)); } /** diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index 5983733f80c..451b1fa9561 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -539,10 +539,7 @@ public class HadoopConverterJob JobHelper.makeSegmentOutputPath( baseOutputPath, outputFS, - finalSegmentTemplate.getDataSource(), - finalSegmentTemplate.getVersion(), - finalSegmentTemplate.getInterval(), - finalSegmentTemplate.getShardSpec().getPartitionNum() + finalSegmentTemplate ) ); context.progress(); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 66a1d6a3049..10abc170dea 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -32,7 +32,9 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -106,10 +108,17 @@ public class HadoopDruidIndexerConfigTest Path path = JobHelper.makeSegmentOutputPath( new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), new DistributedFileSystem(), - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getTuningConfig().getVersion(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - bucket.partitionNum + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ) ); Assert.assertEquals( "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", @@ -159,10 +168,17 @@ public class HadoopDruidIndexerConfigTest Path path = JobHelper.makeSegmentOutputPath( new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), new LocalFileSystem(), - cfg.getSchema().getDataSchema().getDataSource(), - cfg.getSchema().getTuningConfig().getVersion(), - cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), - bucket.partitionNum + new DataSegment( + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + cfg.getSchema().getTuningConfig().getVersion(), + null, + null, + null, + new NumberedShardSpec(bucket.partitionNum, 5000), + -1, + -1 + ) ); Assert.assertEquals( "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712",