diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 8f1056122bb..4b7275fe5f8 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -103,9 +103,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher final DataSegment dataSegment; try (FSDataOutputStream out = fs.create(tmpFile)) { size = CompressionUtils.zip(inDir, out); - Path outDir = new Path(String.format("%s/%s", config.getStorageDirectory(), storageDir)); + final Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); + final Path outDir = outFile.getParent(); dataSegment = createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(new Path(String.format("%s/%s", outDir.toUri().getPath(), "index.zip")))) + segment.withLoadSpec(makeLoadSpec(outFile)) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)), tmpFile.getParent(), @@ -150,7 +151,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private ImmutableMap makeLoadSpec(Path outFile) { - return ImmutableMap.of("type", "hdfs", "path", outFile.toString()); + return ImmutableMap.of("type", "hdfs", "path", outFile.toUri().toString()); } private static class HdfsOutputStreamSupplier extends ByteSink diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index aa19581fb5b..f3b8e5c6604 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -28,10 +28,12 @@ import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import java.io.File; @@ -44,8 +46,33 @@ public class HdfsDataSegmentPusherTest @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Test - public void testPush() throws Exception + public void testPushWithScheme() throws Exception + { + testUsingScheme("file"); + } + + @Test + public void testPushWithBadScheme() throws Exception + { + expectedException.expect(IOException.class); + expectedException.expectMessage("No FileSystem for scheme: xyzzy"); + testUsingScheme("xyzzy"); + + // Not reached + Assert.assertTrue(false); + } + + @Test + public void testPushWithoutScheme() throws Exception + { + testUsingScheme(null); + } + + private void testUsingScheme(final String scheme) throws Exception { Configuration conf = new Configuration(true); @@ -58,8 +85,13 @@ public class HdfsDataSegmentPusherTest final long size = data.length; HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig(); + final File storageDirectory = tempFolder.newFolder(); - config.setStorageDirectory(tempFolder.newFolder().getAbsolutePath()); + config.setStorageDirectory( + scheme != null + ? String.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) + : storageDirectory.getAbsolutePath() + ); HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); DataSegment segmentToPush = new DataSegment( @@ -82,20 +114,21 @@ public class HdfsDataSegmentPusherTest "type", "hdfs", "path", - String.format("%s/%s/index.zip", - config.getStorageDirectory(), - DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush) + String.format( + "%s/%s/index.zip", + config.getStorageDirectory(), + DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush) ) ), segment.getLoadSpec()); // rename directory after push - final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); - File indexFile = new File(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); + final String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(segment); + File indexFile = new File(String.format("%s/%s/index.zip", storageDirectory, segmentPath)); Assert.assertTrue(indexFile.exists()); - File descriptorFile = new File(String.format("%s/%s/descriptor.json", config.getStorageDirectory(), storageDir)); + File descriptorFile = new File(String.format("%s/%s/descriptor.json", storageDirectory, segmentPath)); Assert.assertTrue(descriptorFile.exists()); // push twice will fail and temp dir cleaned - File outDir = new File(String.format("%s/%s", config.getStorageDirectory(), storageDir)); + File outDir = new File(String.format("%s/%s", config.getStorageDirectory(), segmentPath)); outDir.setReadOnly(); try { pusher.push(segmentDir, segmentToPush);