HADOOP-18313: AliyunOSSBlockOutputStream should not mark the temporary file for deletion (#4502)
HADOOP-18313: AliyunOSSBlockOutputStream should not mark the temporary file for deletion. Contributed by wujinhu.
(cherry picked from commit 3ec4b932c1
)
This commit is contained in:
parent
90b1e737d3
commit
690337bca9
|
@ -23,7 +23,8 @@ import java.io.IOException;
|
|||
import java.net.URI;
|
||||
|
||||
import com.aliyun.oss.common.auth.CredentialsProvider;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.Preconditions;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
|
@ -39,7 +40,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
|||
final public class AliyunOSSUtils {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AliyunOSSUtils.class);
|
||||
private static LocalDirAllocator directoryAllocator;
|
||||
private static volatile LocalDirAllocator directoryAllocator;
|
||||
|
||||
private AliyunOSSUtils() {
|
||||
}
|
||||
|
@ -171,21 +172,33 @@ final public class AliyunOSSUtils {
|
|||
|
||||
/**
|
||||
* Demand create the directory allocator, then create a temporary file.
|
||||
* @param path prefix for the temporary file
|
||||
* @param size the size of the file that is going to be written
|
||||
* @param conf the Configuration object
|
||||
* @return a unique temporary file
|
||||
* @throws IOException IO problems
|
||||
* This does not mark the file for deletion when a process exits.
|
||||
* {@link LocalDirAllocator#createTmpFileForWrite(
|
||||
* String, long, Configuration)}.
|
||||
* @param pathStr prefix for the temporary file
|
||||
* @param size the size of the file that is going to be written
|
||||
* @param conf the Configuration object
|
||||
* @return a unique temporary file
|
||||
* @throws IOException IO problems
|
||||
*/
|
||||
public static File createTmpFileForWrite(String path, long size,
|
||||
public static File createTmpFileForWrite(String pathStr, long size,
|
||||
Configuration conf) throws IOException {
|
||||
if (conf.get(BUFFER_DIR_KEY) == null) {
|
||||
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
|
||||
}
|
||||
if (directoryAllocator == null) {
|
||||
directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY);
|
||||
synchronized (AliyunOSSUtils.class) {
|
||||
if (directoryAllocator == null) {
|
||||
directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY);
|
||||
}
|
||||
}
|
||||
}
|
||||
return directoryAllocator.createTmpFileForWrite(path, size, conf);
|
||||
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
|
||||
size, conf);
|
||||
File dir = new File(path.getParent().toUri().getPath());
|
||||
String prefix = path.getName();
|
||||
// create a temp file on this directory
|
||||
return File.createTempFile(prefix, null, dir);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,19 +23,27 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashSet;
|
||||
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Tests regular and multi-part upload functionality for
|
||||
|
@ -201,4 +209,44 @@ public class TestAliyunOSSBlockOutputStream {
|
|||
// Temporary file should be deleted
|
||||
assertEquals(0, files.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectoryAllocator() throws Throwable {
|
||||
Configuration conf = fs.getConf();
|
||||
File tmp = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf);
|
||||
assertTrue("not found: " + tmp, tmp.exists());
|
||||
tmp.delete();
|
||||
|
||||
// tmp should not in DeleteOnExitHook
|
||||
try {
|
||||
Class<?> c = Class.forName("java.io.DeleteOnExitHook");
|
||||
Field field = c.getDeclaredField("files");
|
||||
field.setAccessible(true);
|
||||
String name = field.getName();
|
||||
LinkedHashSet<String> files = (LinkedHashSet<String>)field.get(name);
|
||||
assertTrue("in DeleteOnExitHook", files.isEmpty());
|
||||
assertFalse("in DeleteOnExitHook",
|
||||
(new ArrayList<>(files)).contains(tmp.getPath()));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectoryAllocatorRR() throws Throwable {
|
||||
File dir1 = GenericTestUtils.getRandomizedTestDir();
|
||||
File dir2 = GenericTestUtils.getRandomizedTestDir();
|
||||
dir1.mkdirs();
|
||||
dir2.mkdirs();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(BUFFER_DIR_KEY, dir1 + ", " + dir2);
|
||||
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
|
||||
File tmp1 = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf);
|
||||
tmp1.delete();
|
||||
File tmp2 = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf);
|
||||
tmp2.delete();
|
||||
assertNotEquals("round robin not working",
|
||||
tmp1.getParent(), tmp2.getParent());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue