HDFS-13936. Multipart upload to HDFS to support 0 byte upload. Contributed by Ewan Higgs.

This commit is contained in:
Ewan Higgs 2018-10-02 14:03:28 +02:00
parent a383ac47ca
commit 6fab6886f6
2 changed files with 41 additions and 6 deletions

View File

@ -100,6 +100,14 @@ public class FileSystemMultipartUploader extends MultipartUploader {
return fs.getPathHandle(status);
}
private long totalPartsLen(List<Path> partHandles) throws IOException {
long totalLen = 0;
for (Path p: partHandles) {
totalLen += fs.getFileStatus(p).getLen();
}
return totalLen;
}
@Override
@SuppressWarnings("deprecation") // rename w/ OVERWRITE
public PathHandle complete(Path filePath,
@ -127,12 +135,17 @@ public class FileSystemMultipartUploader extends MultipartUploader {
.collect(Collectors.toList());
Path collectorPath = createCollectorPath(filePath);
Path filePathInsideCollector = mergePaths(collectorPath,
new Path(Path.SEPARATOR + filePath.getName()));
fs.create(filePathInsideCollector).close();
fs.concat(filePathInsideCollector,
partHandles.toArray(new Path[handles.size()]));
fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
boolean emptyFile = totalPartsLen(partHandles) == 0;
if (emptyFile) {
fs.create(filePath).close();
} else {
Path filePathInsideCollector = mergePaths(collectorPath,
new Path(Path.SEPARATOR + filePath.getName()));
fs.create(filePathInsideCollector).close();
fs.concat(filePathInsideCollector,
partHandles.toArray(new Path[handles.size()]));
fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
}
fs.delete(collectorPath, true);
return getPathHandle(filePath);
}

View File

@ -164,6 +164,28 @@ public abstract class AbstractContractMultipartUploaderTest extends
payloadCount * partSizeInBytes());
}
/**
* Assert that a multipart upload is successful when a single empty part is
* uploaded.
* @throws Exception failure
*/
@Test
public void testMultipartUploadEmptyPart() throws Exception {
FileSystem fs = getFileSystem();
Path file = path("testMultipartUpload");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
UploadHandle uploadHandle = mpu.initialize(file);
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
byte[] payload = new byte[0];
origDigest.update(payload);
InputStream is = new ByteArrayInputStream(payload);
PartHandle partHandle = mpu.putPart(file, is, 0, uploadHandle,
payload.length);
partHandles.add(Pair.of(0, partHandle));
completeUpload(file, mpu, uploadHandle, partHandles, origDigest, 0);
}
/**
* Assert that a multipart upload is successful even when the parts are
* given in the reverse order.