Using fully qualified hdfs path. (#3705)

* Using fully qualified hdfs path.

* Review changes.

* Remove unused imports.

* Variable name change.
This commit is contained in:
Akash Dwivedi 2017-01-17 12:40:22 -08:00 committed by Himanshu
parent 558dc365a4
commit e550d48772
2 changed files with 21 additions and 10 deletions

View File

@ -50,17 +50,20 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private final HdfsDataSegmentPusherConfig config; private final HdfsDataSegmentPusherConfig config;
private final Configuration hadoopConfig; private final Configuration hadoopConfig;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final String fullyQualifiedStorageDirectory;
@Inject @Inject
public HdfsDataSegmentPusher( public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config, HdfsDataSegmentPusherConfig config,
Configuration hadoopConfig, Configuration hadoopConfig,
ObjectMapper jsonMapper ObjectMapper jsonMapper
) ) throws IOException
{ {
this.config = config; this.config = config;
this.hadoopConfig = hadoopConfig; this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.fullyQualifiedStorageDirectory = FileSystem.newInstance(hadoopConfig).makeQualified(new Path(config.getStorageDirectory()))
.toUri().toString();
log.info("Configured HDFS as deep storage"); log.info("Configured HDFS as deep storage");
} }
@ -75,7 +78,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
@Override @Override
public String getPathForHadoop() public String getPathForHadoop()
{ {
return new Path(config.getStorageDirectory()).toUri().toString(); return fullyQualifiedStorageDirectory;
} }
@Override @Override
@ -86,13 +89,13 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
log.info( log.info(
"Copying segment[%s] to HDFS at location[%s/%s]", "Copying segment[%s] to HDFS at location[%s/%s]",
segment.getIdentifier(), segment.getIdentifier(),
config.getStorageDirectory(), fullyQualifiedStorageDirectory,
storageDir storageDir
); );
Path tmpFile = new Path(String.format( Path tmpFile = new Path(String.format(
"%s/%s/index.zip", "%s/%s/index.zip",
config.getStorageDirectory(), fullyQualifiedStorageDirectory,
UUIDUtils.generateUuid() UUIDUtils.generateUuid()
)); ));
FileSystem fs = tmpFile.getFileSystem(hadoopConfig); FileSystem fs = tmpFile.getFileSystem(hadoopConfig);
@ -104,7 +107,11 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
final DataSegment dataSegment; final DataSegment dataSegment;
try (FSDataOutputStream out = fs.create(tmpFile)) { try (FSDataOutputStream out = fs.create(tmpFile)) {
size = CompressionUtils.zip(inDir, out); size = CompressionUtils.zip(inDir, out);
final Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); final Path outFile = new Path(String.format(
"%s/%s/index.zip",
fullyQualifiedStorageDirectory,
storageDir
));
final Path outDir = outFile.getParent(); final Path outDir = outFile.getParent();
dataSegment = createDescriptorFile( dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile)) segment.withLoadSpec(makeLoadSpec(outFile))
@ -131,12 +138,14 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
)); ));
} }
} }
} finally { }
finally {
try { try {
if (fs.exists(tmpFile.getParent()) && !fs.delete(tmpFile.getParent(), true)) { if (fs.exists(tmpFile.getParent()) && !fs.delete(tmpFile.getParent(), true)) {
log.error("Failed to delete temp directory[%s]", tmpFile.getParent()); log.error("Failed to delete temp directory[%s]", tmpFile.getParent());
} }
} catch(IOException ex) { }
catch (IOException ex) {
log.error(ex, "Failed to delete temp directory[%s]", tmpFile.getParent()); log.error(ex, "Failed to delete temp directory[%s]", tmpFile.getParent());
} }
} }

View File

@ -28,6 +28,8 @@ import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
@ -57,8 +59,8 @@ public class HdfsDataSegmentPusherTest
@Test @Test
public void testPushWithBadScheme() throws Exception public void testPushWithBadScheme() throws Exception
{ {
expectedException.expect(IOException.class); expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("No FileSystem for scheme: xyzzy"); expectedException.expectMessage("Wrong FS");
testUsingScheme("xyzzy"); testUsingScheme("xyzzy");
// Not reached // Not reached
@ -115,7 +117,7 @@ public class HdfsDataSegmentPusherTest
"path", "path",
String.format( String.format(
"%s/%s/index.zip", "%s/%s/index.zip",
config.getStorageDirectory(), FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush) DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush)
) )
), segment.getLoadSpec()); ), segment.getLoadSpec());