HdfsDataSegmentPusher: Properly include scheme, host in output path if necessary. (#3577)

Fixes #3576.
This commit is contained in:
Gian Merlino 2016-10-17 09:37:18 -05:00 committed by Fangjin Yang
parent 472c409b99
commit 0ce33bc95f
2 changed files with 46 additions and 12 deletions

View File

@ -103,9 +103,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
final DataSegment dataSegment; final DataSegment dataSegment;
try (FSDataOutputStream out = fs.create(tmpFile)) { try (FSDataOutputStream out = fs.create(tmpFile)) {
size = CompressionUtils.zip(inDir, out); size = CompressionUtils.zip(inDir, out);
Path outDir = new Path(String.format("%s/%s", config.getStorageDirectory(), storageDir)); final Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
final Path outDir = outFile.getParent();
dataSegment = createDescriptorFile( dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(new Path(String.format("%s/%s", outDir.toUri().getPath(), "index.zip")))) segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size) .withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)), .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)),
tmpFile.getParent(), tmpFile.getParent(),
@ -150,7 +151,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private ImmutableMap<String, Object> makeLoadSpec(Path outFile) private ImmutableMap<String, Object> makeLoadSpec(Path outFile)
{ {
return ImmutableMap.<String, Object>of("type", "hdfs", "path", outFile.toString()); return ImmutableMap.<String, Object>of("type", "hdfs", "path", outFile.toUri().toString());
} }
private static class HdfsOutputStreamSupplier extends ByteSink private static class HdfsOutputStreamSupplier extends ByteSink

View File

@ -28,10 +28,12 @@ import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.File; import java.io.File;
@ -44,8 +46,33 @@ public class HdfsDataSegmentPusherTest
@Rule @Rule
public final TemporaryFolder tempFolder = new TemporaryFolder(); public final TemporaryFolder tempFolder = new TemporaryFolder();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Test @Test
public void testPush() throws Exception public void testPushWithScheme() throws Exception
{
testUsingScheme("file");
}
@Test
public void testPushWithBadScheme() throws Exception
{
expectedException.expect(IOException.class);
expectedException.expectMessage("No FileSystem for scheme: xyzzy");
testUsingScheme("xyzzy");
// Not reached
Assert.assertTrue(false);
}
@Test
public void testPushWithoutScheme() throws Exception
{
testUsingScheme(null);
}
private void testUsingScheme(final String scheme) throws Exception
{ {
Configuration conf = new Configuration(true); Configuration conf = new Configuration(true);
@ -58,8 +85,13 @@ public class HdfsDataSegmentPusherTest
final long size = data.length; final long size = data.length;
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig(); HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
final File storageDirectory = tempFolder.newFolder();
config.setStorageDirectory(tempFolder.newFolder().getAbsolutePath()); config.setStorageDirectory(
scheme != null
? String.format("%s://%s", scheme, storageDirectory.getAbsolutePath())
: storageDirectory.getAbsolutePath()
);
HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper());
DataSegment segmentToPush = new DataSegment( DataSegment segmentToPush = new DataSegment(
@ -82,20 +114,21 @@ public class HdfsDataSegmentPusherTest
"type", "type",
"hdfs", "hdfs",
"path", "path",
String.format("%s/%s/index.zip", String.format(
"%s/%s/index.zip",
config.getStorageDirectory(), config.getStorageDirectory(),
DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush) DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush)
) )
), segment.getLoadSpec()); ), segment.getLoadSpec());
// rename directory after push // rename directory after push
final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); final String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(segment);
File indexFile = new File(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); File indexFile = new File(String.format("%s/%s/index.zip", storageDirectory, segmentPath));
Assert.assertTrue(indexFile.exists()); Assert.assertTrue(indexFile.exists());
File descriptorFile = new File(String.format("%s/%s/descriptor.json", config.getStorageDirectory(), storageDir)); File descriptorFile = new File(String.format("%s/%s/descriptor.json", storageDirectory, segmentPath));
Assert.assertTrue(descriptorFile.exists()); Assert.assertTrue(descriptorFile.exists());
// push twice will fail and temp dir cleaned // push twice will fail and temp dir cleaned
File outDir = new File(String.format("%s/%s", config.getStorageDirectory(), storageDir)); File outDir = new File(String.format("%s/%s", config.getStorageDirectory(), segmentPath));
outDir.setReadOnly(); outDir.setReadOnly();
try { try {
pusher.push(segmentDir, segmentToPush); pusher.push(segmentDir, segmentToPush);