Lazy init of fullyQualifiedStorageDirectory in HDFS pusher (#5684)

* Lazy init of fullyQualifiedStorageDirectory in HDFS pusher

* Comment

* Fix test

* PR comments
This commit is contained in:
Jonathan Wei 2018-04-28 21:07:39 -07:00 committed by Slim Bouguerra
parent 4db9e39a71
commit 513fab77d9
2 changed files with 25 additions and 11 deletions

View File

@ -20,6 +20,8 @@
package io.druid.storage.hdfs; package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSink; import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
@ -53,7 +55,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private final Configuration hadoopConfig; private final Configuration hadoopConfig;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final String fullyQualifiedStorageDirectory;
// We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA.
// Please see https://github.com/druid-io/druid/pull/5684
private final Supplier<String> fullyQualifiedStorageDirectory;
@Inject @Inject
public HdfsDataSegmentPusher( public HdfsDataSegmentPusher(
@ -65,10 +70,19 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
this.hadoopConfig = hadoopConfig; this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
Path storageDir = new Path(config.getStorageDirectory()); Path storageDir = new Path(config.getStorageDirectory());
this.fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig) this.fullyQualifiedStorageDirectory = Suppliers.memoize(
.makeQualified(storageDir) () -> {
.toUri() try {
.toString(); return FileSystem.newInstance(storageDir.toUri(), hadoopConfig)
.makeQualified(storageDir)
.toUri()
.toString();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
);
log.info("Configured HDFS as deep storage"); log.info("Configured HDFS as deep storage");
} }
@ -83,7 +97,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
@Override @Override
public String getPathForHadoop() public String getPathForHadoop()
{ {
return fullyQualifiedStorageDirectory; return fullyQualifiedStorageDirectory.get();
} }
@Override @Override
@ -94,13 +108,13 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
log.info( log.info(
"Copying segment[%s] to HDFS at location[%s/%s]", "Copying segment[%s] to HDFS at location[%s/%s]",
segment.getIdentifier(), segment.getIdentifier(),
fullyQualifiedStorageDirectory, fullyQualifiedStorageDirectory.get(),
storageDir storageDir
); );
Path tmpIndexFile = new Path(StringUtils.format( Path tmpIndexFile = new Path(StringUtils.format(
"%s/%s/%s/%s_index.zip", "%s/%s/%s/%s_index.zip",
fullyQualifiedStorageDirectory, fullyQualifiedStorageDirectory.get(),
segment.getDataSource(), segment.getDataSource(),
UUIDUtils.generateUuid(), UUIDUtils.generateUuid(),
segment.getShardSpec().getPartitionNum() segment.getShardSpec().getPartitionNum()
@ -116,13 +130,13 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
size = CompressionUtils.zip(inDir, out); size = CompressionUtils.zip(inDir, out);
final Path outIndexFile = new Path(StringUtils.format( final Path outIndexFile = new Path(StringUtils.format(
"%s/%s/%d_index.zip", "%s/%s/%d_index.zip",
fullyQualifiedStorageDirectory, fullyQualifiedStorageDirectory.get(),
storageDir, storageDir,
segment.getShardSpec().getPartitionNum() segment.getShardSpec().getPartitionNum()
)); ));
final Path outDescriptorFile = new Path(StringUtils.format( final Path outDescriptorFile = new Path(StringUtils.format(
"%s/%s/%d_descriptor.json", "%s/%s/%d_descriptor.json",
fullyQualifiedStorageDirectory, fullyQualifiedStorageDirectory.get(),
storageDir, storageDir,
segment.getShardSpec().getPartitionNum() segment.getShardSpec().getPartitionNum()
)); ));

View File

@ -107,7 +107,7 @@ public class HdfsDataSegmentPusherTest
@Test @Test
public void testPushWithBadScheme() throws Exception public void testPushWithBadScheme() throws Exception
{ {
expectedException.expect(IOException.class); expectedException.expect(RuntimeException.class);
expectedException.expectMessage("No FileSystem for scheme"); expectedException.expectMessage("No FileSystem for scheme");
testUsingScheme("xyzzy"); testUsingScheme("xyzzy");