From 2d8282bb8248e6984878626c4cdc7148aa2e7202 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 14 May 2019 14:05:39 -0700 Subject: [PATCH] HADOOP-16306. AliyunOSS: Remove temporary files when upload small files to OSS. Contributed by wujinhu. --- .../oss/AliyunOSSBlockOutputStream.java | 10 +++++- .../oss/TestAliyunOSSBlockOutputStream.java | 32 +++++++++++++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java index 17f21cbc1c3..a48fde6eaa7 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java @@ -124,7 +124,7 @@ public synchronized void close() throws IOException { new ArrayList<>(partETags)); } } finally { - removePartFiles(); + removeTemporaryFiles(); closed = true; } } @@ -149,6 +149,14 @@ public synchronized void write(byte[] b, int off, int len) } } + private void removeTemporaryFiles() { + for (File file : blockFiles.values()) { + if (file != null && file.exists() && !file.delete()) { + LOG.warn("Failed to delete temporary file {}", file); + } + } + } + private void removePartFiles() throws IOException { for (ListenableFuture partETagFuture : partETagsFutures) { if (!partETagFuture.isDone()) { diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java index 14bd8e63284..835e5ffaef5 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.aliyun.oss; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; @@ -30,7 +31,9 @@ import java.io.IOException; +import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; +import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY; import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.junit.Assert.assertEquals; @@ -49,9 +52,9 @@ public class TestAliyunOSSBlockOutputStream { @Before public void setUp() throws Exception { Configuration conf = new Configuration(); - conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024); + conf.setInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024); conf.setInt(IO_CHUNK_BUFFER_SIZE, - conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0)); + conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0)); conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20); fs = AliyunOSSTestUtils.createTestFileSystem(conf); } @@ -70,6 +73,7 @@ private Path getTestPath() { @Test public void testZeroByteUpload() throws IOException { ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0); + bufferDirShouldEmpty(); } @Test @@ -106,6 +110,7 @@ public void testRegularUpload() throws IOException { assertEquals(3 * size, statistics.getBytesRead()); assertEquals(10, statistics.getWriteOps()); assertEquals(3 * size, statistics.getBytesWritten()); + bufferDirShouldEmpty(); } @Test @@ -131,6 +136,7 @@ public void testMultiPartUpload() throws IOException { assertEquals(3 * size, statistics.getBytesRead()); assertEquals(25, statistics.getWriteOps()); assertEquals(3 * size, statistics.getBytesWritten()); + bufferDirShouldEmpty(); } @Test @@ -144,6 +150,7 @@ public void testMultiPartUploadConcurrent() throws IOException { assertEquals(size, statistics.getBytesRead()); assertEquals(52, statistics.getWriteOps()); assertEquals(size, statistics.getBytesWritten()); + bufferDirShouldEmpty(); } @Test @@ -154,6 +161,7 @@ public void testHugeUpload() throws IOException { MULTIPART_UPLOAD_PART_SIZE_DEFAULT); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1); + bufferDirShouldEmpty(); } @Test @@ -174,4 +182,24 @@ public void testMultiPartUploadLimit() throws IOException { assert(10001 * 100 * 1024 / partSize4 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); } + + @Test + /** + * This test is used to verify HADOOP-16306. + * Test small file uploading so that oss fs will upload file directly + * instead of multi part upload. + */ + public void testSmallUpload() throws IOException { + long size = fs.getConf().getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1); + bufferDirShouldEmpty(); + } + + private void bufferDirShouldEmpty() throws IOException { + Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY)); + FileStatus[] files = bufferPath.getFileSystem( + fs.getConf()).listStatus(bufferPath); + // Temporary file should be deleted + assertEquals(0, files.length); + } }