HADOOP-16306. AliyunOSS: Remove temporary files when upload small files to OSS. Contributed by wujinhu.
(cherry picked from commit 2d8282bb82
)
This commit is contained in:
parent
f4ee38df29
commit
26eb9f52fb
|
@ -124,7 +124,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
||||||
new ArrayList<>(partETags));
|
new ArrayList<>(partETags));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
removePartFiles();
|
removeTemporaryFiles();
|
||||||
closed = true;
|
closed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -149,6 +149,14 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeTemporaryFiles() {
|
||||||
|
for (File file : blockFiles.values()) {
|
||||||
|
if (file != null && file.exists() && !file.delete()) {
|
||||||
|
LOG.warn("Failed to delete temporary file {}", file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void removePartFiles() throws IOException {
|
private void removePartFiles() throws IOException {
|
||||||
for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
|
for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
|
||||||
if (!partETagFuture.isDone()) {
|
if (!partETagFuture.isDone()) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.fs.aliyun.oss;
|
package org.apache.hadoop.fs.aliyun.oss;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
@ -30,7 +31,9 @@ import org.junit.rules.Timeout;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY;
|
||||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
|
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
@ -49,9 +52,9 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
|
conf.setInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
|
||||||
conf.setInt(IO_CHUNK_BUFFER_SIZE,
|
conf.setInt(IO_CHUNK_BUFFER_SIZE,
|
||||||
conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
|
conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
|
||||||
conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
|
conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
|
||||||
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
|
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
|
||||||
}
|
}
|
||||||
|
@ -70,6 +73,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
@Test
|
@Test
|
||||||
public void testZeroByteUpload() throws IOException {
|
public void testZeroByteUpload() throws IOException {
|
||||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0);
|
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0);
|
||||||
|
bufferDirShouldEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -106,6 +110,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
assertEquals(3 * size, statistics.getBytesRead());
|
assertEquals(3 * size, statistics.getBytesRead());
|
||||||
assertEquals(10, statistics.getWriteOps());
|
assertEquals(10, statistics.getWriteOps());
|
||||||
assertEquals(3 * size, statistics.getBytesWritten());
|
assertEquals(3 * size, statistics.getBytesWritten());
|
||||||
|
bufferDirShouldEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -131,6 +136,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
assertEquals(3 * size, statistics.getBytesRead());
|
assertEquals(3 * size, statistics.getBytesRead());
|
||||||
assertEquals(25, statistics.getWriteOps());
|
assertEquals(25, statistics.getWriteOps());
|
||||||
assertEquals(3 * size, statistics.getBytesWritten());
|
assertEquals(3 * size, statistics.getBytesWritten());
|
||||||
|
bufferDirShouldEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -144,6 +150,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
assertEquals(size, statistics.getBytesRead());
|
assertEquals(size, statistics.getBytesRead());
|
||||||
assertEquals(52, statistics.getWriteOps());
|
assertEquals(52, statistics.getWriteOps());
|
||||||
assertEquals(size, statistics.getBytesWritten());
|
assertEquals(size, statistics.getBytesWritten());
|
||||||
|
bufferDirShouldEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -154,6 +161,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
||||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||||
MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
|
MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
|
||||||
|
bufferDirShouldEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -174,4 +182,24 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
assert(10001 * 100 * 1024 / partSize4
|
assert(10001 * 100 * 1024 / partSize4
|
||||||
< Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
|
< Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* This test is used to verify HADOOP-16306.
|
||||||
|
* Test small file uploading so that oss fs will upload file directly
|
||||||
|
* instead of multi part upload.
|
||||||
|
*/
|
||||||
|
public void testSmallUpload() throws IOException {
|
||||||
|
long size = fs.getConf().getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024);
|
||||||
|
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
|
||||||
|
bufferDirShouldEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void bufferDirShouldEmpty() throws IOException {
|
||||||
|
Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY));
|
||||||
|
FileStatus[] files = bufferPath.getFileSystem(
|
||||||
|
fs.getConf()).listStatus(bufferPath);
|
||||||
|
// Temporary file should be deleted
|
||||||
|
assertEquals(0, files.length);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue