Avoid materializing list of segment files when finding a partition file during shuffle (#11903)

* Avoid materializing list of segment files (it can cause OOM/memory pressure) as well as looping over the files.

* Validate subTaskId
This commit is contained in:
Agustin Gonzalez 2021-11-11 10:51:52 -07:00 committed by GitHub
parent 223c5692a8
commit a13a96d5e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 4 additions and 9 deletions

View File

@ -373,24 +373,19 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId) public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)
{ {
IdUtils.validateId("supervisorTaskId", supervisorTaskId); IdUtils.validateId("supervisorTaskId", supervisorTaskId);
IdUtils.validateId("subTaskId", subTaskId);
for (StorageLocation location : shuffleDataLocations) { for (StorageLocation location : shuffleDataLocations) {
final File partitionDir = new File(location.getPath(), getPartitionDirPath(supervisorTaskId, interval, bucketId)); final File partitionDir = new File(location.getPath(), getPartitionDirPath(supervisorTaskId, interval, bucketId));
if (partitionDir.exists()) { if (partitionDir.exists()) {
supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow()); supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
final File[] segmentFiles = partitionDir.listFiles(); final File segmentFile = new File(partitionDir, subTaskId);
if (segmentFiles == null) { if (segmentFile.exists()) {
return Optional.empty(); return Optional.of(Files.asByteSource(segmentFile));
} else { } else {
for (File segmentFile : segmentFiles) {
if (segmentFile.getName().equals(subTaskId)) {
return Optional.of(Files.asByteSource(segmentFile));
}
}
return Optional.empty(); return Optional.empty();
} }
} }
} }
return Optional.empty(); return Optional.empty();
} }