diff --git a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 6d0ea322313..0fe72cf0e44 100644 --- a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -57,7 +57,9 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.utils.JodaUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapreduce.Job; import org.joda.time.DateTime; @@ -656,22 +658,33 @@ public class HadoopDruidIndexerConfig return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); } - public Path makeSegmentOutputPath(Bucket bucket) - { - final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); - - return new Path( - String.format( - "%s/%s/%s_%s/%s/%s", - getSegmentOutputDir(), - dataSource, - bucketInterval.getStart().toString(), - bucketInterval.getEnd().toString(), - getVersion(), - bucket.partitionNum - ) - ); - } + public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket) + { + final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); + if (fileSystem instanceof DistributedFileSystem) + { + return new Path( + String.format( + "%s/%s/%s_%s/%s/%s", + getSegmentOutputDir().replace(":", "_"), + dataSource.replace(":", "_"), + bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()), + bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()), + getVersion().replace(":", "_"), + bucket.partitionNum + )); + } + return new Path( + String.format( + "%s/%s/%s_%s/%s/%s", + getSegmentOutputDir(), + dataSource, + bucketInterval.getStart().toString(), + bucketInterval.getEnd().toString(), + getVersion(), + bucket.partitionNum + )); + } public Job addInputPaths(Job job) throws IOException { diff --git a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 45cbded7f1a..583cafcae4f 100644 --- a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -375,12 +375,14 @@ public class IndexGeneratorJob implements Jobby Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); int attemptNumber = context.getTaskAttemptID().getId(); - Path indexBasePath = config.makeSegmentOutputPath(bucket); - Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); - final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); - final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); - outputFS.mkdirs(indexBasePath); + FileSystem fileSystem = FileSystem.get(context.getConfiguration()); + Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); + Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); + final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); + final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); + + outputFS.mkdirs(indexBasePath); Exception caughtException = null; ZipOutputStream out = null; diff --git a/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java index dcabe168e67..d0d2ee1d2c0 100644 --- a/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -27,7 +27,12 @@ import com.metamx.druid.indexer.partitions.PartitionsSpec; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.jackson.DefaultObjectMapper; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.format.ISODateTimeFormat; import org.junit.Assert; import org.junit.Test; @@ -427,6 +432,65 @@ public class HadoopDruidIndexerConfigTest ); } + + @Test + public void shouldMakeHDFSCompliantSegmentOutputPath() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonReadWriteRead( + "{" + + "\"dataSource\": \"the:data:source\"," + + " \"granularitySpec\":{" + + " \"type\":\"uniform\"," + + " \"gran\":\"hour\"," + + " \"intervals\":[\"2012-07-10/P1D\"]" + + " }," + + "\"segmentOutputPath\": \"/tmp/dru:id/data:test\"" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + cfg.setVersion("some:brand:new:version"); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket); + Assert.assertEquals("/tmp/dru_id/data_test/the_data_source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", path.toString()); + + } + + @Test + public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonReadWriteRead( + "{" + + "\"dataSource\": \"the:data:source\"," + + " \"granularitySpec\":{" + + " \"type\":\"uniform\"," + + " \"gran\":\"hour\"," + + " \"intervals\":[\"2012-07-10/P1D\"]" + + " }," + + "\"segmentOutputPath\": \"/tmp/dru:id/data:test\"" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + cfg.setVersion("some:brand:new:version"); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket); + Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString()); + + } + private T jsonReadWriteRead(String s, Class klass) { try {