diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 96db0b28210..a7fc9ff40a6 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -20,6 +20,8 @@ package io.druid.storage.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; @@ -53,7 +55,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private final Configuration hadoopConfig; private final ObjectMapper jsonMapper; - private final String fullyQualifiedStorageDirectory; + + // We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA. + // Please see https://github.com/druid-io/druid/pull/5684 + private final Supplier fullyQualifiedStorageDirectory; @Inject public HdfsDataSegmentPusher( @@ -65,10 +70,19 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher this.hadoopConfig = hadoopConfig; this.jsonMapper = jsonMapper; Path storageDir = new Path(config.getStorageDirectory()); - this.fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig) - .makeQualified(storageDir) - .toUri() - .toString(); + this.fullyQualifiedStorageDirectory = Suppliers.memoize( + () -> { + try { + return FileSystem.newInstance(storageDir.toUri(), hadoopConfig) + .makeQualified(storageDir) + .toUri() + .toString(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + ); log.info("Configured HDFS as deep storage"); } @@ -83,7 +97,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher @Override public String getPathForHadoop() { - return fullyQualifiedStorageDirectory; + return fullyQualifiedStorageDirectory.get(); } @Override @@ -94,13 +108,13 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher log.info( "Copying segment[%s] to HDFS at location[%s/%s]", segment.getIdentifier(), - fullyQualifiedStorageDirectory, + fullyQualifiedStorageDirectory.get(), storageDir ); Path tmpIndexFile = new Path(StringUtils.format( "%s/%s/%s/%s_index.zip", - fullyQualifiedStorageDirectory, + fullyQualifiedStorageDirectory.get(), segment.getDataSource(), UUIDUtils.generateUuid(), segment.getShardSpec().getPartitionNum() @@ -116,13 +130,13 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher size = CompressionUtils.zip(inDir, out); final Path outIndexFile = new Path(StringUtils.format( "%s/%s/%d_index.zip", - fullyQualifiedStorageDirectory, + fullyQualifiedStorageDirectory.get(), storageDir, segment.getShardSpec().getPartitionNum() )); final Path outDescriptorFile = new Path(StringUtils.format( "%s/%s/%d_descriptor.json", - fullyQualifiedStorageDirectory, + fullyQualifiedStorageDirectory.get(), storageDir, segment.getShardSpec().getPartitionNum() )); diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index e53d1ea3e0a..bd69c323318 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -107,7 +107,7 @@ public class HdfsDataSegmentPusherTest @Test public void testPushWithBadScheme() throws Exception { - expectedException.expect(IOException.class); + expectedException.expect(RuntimeException.class); expectedException.expectMessage("No FileSystem for scheme"); testUsingScheme("xyzzy");