Fix invalid segment path when using hdfs as the intermediate deepstore (#14984)

This PR fixes the invalid segment path when enabling druid_processing_intermediaryData_storage_type: "deepstore" and using hdfs as the deep store.
This commit is contained in:
Yuanli Han 2023-09-29 15:23:46 +08:00 committed by GitHub
parent db71e28808
commit 9a4433bbad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 6 deletions

View File

@ -106,27 +106,38 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : ""; final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : "";
final String outIndexFilePathSuffix = StringUtils.format( final String outIndexFilePathSuffix = StringUtils.format(
"%s/%d_%sindex.zip", "%s/%s/%d_%sindex.zip",
fullyQualifiedStorageDirectory.get(),
storageDir, storageDir,
segment.getShardSpec().getPartitionNum(), segment.getShardSpec().getPartitionNum(),
uniquePrefix uniquePrefix
); );
return pushToPath(inDir, segment, outIndexFilePathSuffix); return pushToFilePath(inDir, segment, outIndexFilePathSuffix);
} }
@Override @Override
public DataSegment pushToPath(File inDir, DataSegment segment, String storageDirSuffix) throws IOException public DataSegment pushToPath(File inDir, DataSegment segment, String storageDirSuffix) throws IOException
{
String outIndexFilePath = StringUtils.format(
"%s/%s/%d_index.zip",
fullyQualifiedStorageDirectory.get(),
storageDirSuffix.replace(':', '_'),
segment.getShardSpec().getPartitionNum()
);
return pushToFilePath(inDir, segment, outIndexFilePath);
}
private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIndexFilePath) throws IOException
{ {
log.debug( log.debug(
"Copying segment[%s] to HDFS at location[%s/%s]", "Copying segment[%s] to HDFS at location[%s/%s]",
segment.getId(), segment.getId(),
fullyQualifiedStorageDirectory.get(), fullyQualifiedStorageDirectory.get(),
storageDirSuffix outIndexFilePath
); );
final String storageDir = StringUtils.format("%s/%s", fullyQualifiedStorageDirectory.get(), storageDirSuffix);
Path tmpIndexFile = new Path(StringUtils.format( Path tmpIndexFile = new Path(StringUtils.format(
"%s/%s/%s/%s_index.zip", "%s/%s/%s/%s_index.zip",
fullyQualifiedStorageDirectory.get(), fullyQualifiedStorageDirectory.get(),
@ -145,7 +156,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
try (FSDataOutputStream out = fs.create(tmpIndexFile)) { try (FSDataOutputStream out = fs.create(tmpIndexFile)) {
size = CompressionUtils.zip(inDir, out); size = CompressionUtils.zip(inDir, out);
} }
final Path outIndexFile = new Path(storageDir); final Path outIndexFile = new Path(outIndexFilePath);
dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri())) dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri()))
.withSize(size) .withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)); .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir));

View File

@ -170,6 +170,46 @@ public class HdfsDataSegmentPusherTest
); );
} }
@Test
public void testPushToPath() throws Exception
{
Configuration conf = new Configuration(true);
// Create a mock segment on disk
File segmentDir = tempFolder.newFolder();
File tmp = new File(segmentDir, "version.bin");
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
Files.write(data, tmp);
final long size = data.length;
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
final File storageDirectory = tempFolder.newFolder();
config.setStorageDirectory(StringUtils.format("file://%s", storageDirectory.getAbsolutePath()));
HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper());
DataSegment segmentToPush = new DataSegment(
"foo",
Intervals.of("2015/2016"),
"0",
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
size
);
final String storageDirSuffix = "shuffle-data/index_parallel_session_analysis_test_bkdhhedd_2023-09-08T03:18:21.121Z/2023-08-29T16:00:00.000Z/2023-08-29T17:00:00.000Z";
DataSegment segment = pusher.pushToPath(segmentDir, segmentToPush, storageDirSuffix);
Assert.assertTrue(
segment.getLoadSpec().get("path").toString(),
segment.getLoadSpec().get("path").toString().endsWith(storageDirSuffix.replace(':', '_') + "/0_index.zip")
);
}
private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception
{ {
Configuration conf = new Configuration(true); Configuration conf = new Configuration(true);