HADOOP-18313: AliyunOSSBlockOutputStream should not mark the temporary file for deletion (#4502)
HADOOP-18313: AliyunOSSBlockOutputStream should not mark the temporary file for deletion. Contributed by wujinhu.
This commit is contained in:
parent
a432925f74
commit
3ec4b932c1
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import com.aliyun.oss.common.auth.CredentialsProvider;
|
import com.aliyun.oss.common.auth.CredentialsProvider;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.util.Preconditions;
|
import org.apache.hadoop.util.Preconditions;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -39,7 +40,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
||||||
final public class AliyunOSSUtils {
|
final public class AliyunOSSUtils {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(AliyunOSSUtils.class);
|
LoggerFactory.getLogger(AliyunOSSUtils.class);
|
||||||
private static LocalDirAllocator directoryAllocator;
|
private static volatile LocalDirAllocator directoryAllocator;
|
||||||
|
|
||||||
private AliyunOSSUtils() {
|
private AliyunOSSUtils() {
|
||||||
}
|
}
|
||||||
|
@ -171,21 +172,33 @@ final public class AliyunOSSUtils {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Demand create the directory allocator, then create a temporary file.
|
* Demand create the directory allocator, then create a temporary file.
|
||||||
* @param path prefix for the temporary file
|
* This does not mark the file for deletion when a process exits.
|
||||||
* @param size the size of the file that is going to be written
|
* {@link LocalDirAllocator#createTmpFileForWrite(
|
||||||
* @param conf the Configuration object
|
* String, long, Configuration)}.
|
||||||
* @return a unique temporary file
|
* @param pathStr prefix for the temporary file
|
||||||
* @throws IOException IO problems
|
* @param size the size of the file that is going to be written
|
||||||
|
* @param conf the Configuration object
|
||||||
|
* @return a unique temporary file
|
||||||
|
* @throws IOException IO problems
|
||||||
*/
|
*/
|
||||||
public static File createTmpFileForWrite(String path, long size,
|
public static File createTmpFileForWrite(String pathStr, long size,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
if (conf.get(BUFFER_DIR_KEY) == null) {
|
if (conf.get(BUFFER_DIR_KEY) == null) {
|
||||||
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
|
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
|
||||||
}
|
}
|
||||||
if (directoryAllocator == null) {
|
if (directoryAllocator == null) {
|
||||||
directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY);
|
synchronized (AliyunOSSUtils.class) {
|
||||||
|
if (directoryAllocator == null) {
|
||||||
|
directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return directoryAllocator.createTmpFileForWrite(path, size, conf);
|
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
|
||||||
|
size, conf);
|
||||||
|
File dir = new File(path.getParent().toUri().getPath());
|
||||||
|
String prefix = path.getName();
|
||||||
|
// create a temp file on this directory
|
||||||
|
return File.createTempFile(prefix, null, dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -23,19 +23,27 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY;
|
import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY;
|
||||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
|
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY;
|
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests regular and multi-part upload functionality for
|
* Tests regular and multi-part upload functionality for
|
||||||
|
@ -201,4 +209,44 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
// Temporary file should be deleted
|
// Temporary file should be deleted
|
||||||
assertEquals(0, files.length);
|
assertEquals(0, files.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDirectoryAllocator() throws Throwable {
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
File tmp = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf);
|
||||||
|
assertTrue("not found: " + tmp, tmp.exists());
|
||||||
|
tmp.delete();
|
||||||
|
|
||||||
|
// tmp should not in DeleteOnExitHook
|
||||||
|
try {
|
||||||
|
Class<?> c = Class.forName("java.io.DeleteOnExitHook");
|
||||||
|
Field field = c.getDeclaredField("files");
|
||||||
|
field.setAccessible(true);
|
||||||
|
String name = field.getName();
|
||||||
|
LinkedHashSet<String> files = (LinkedHashSet<String>)field.get(name);
|
||||||
|
assertTrue("in DeleteOnExitHook", files.isEmpty());
|
||||||
|
assertFalse("in DeleteOnExitHook",
|
||||||
|
(new ArrayList<>(files)).contains(tmp.getPath()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDirectoryAllocatorRR() throws Throwable {
|
||||||
|
File dir1 = GenericTestUtils.getRandomizedTestDir();
|
||||||
|
File dir2 = GenericTestUtils.getRandomizedTestDir();
|
||||||
|
dir1.mkdirs();
|
||||||
|
dir2.mkdirs();
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(BUFFER_DIR_KEY, dir1 + ", " + dir2);
|
||||||
|
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
|
||||||
|
File tmp1 = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf);
|
||||||
|
tmp1.delete();
|
||||||
|
File tmp2 = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf);
|
||||||
|
tmp2.delete();
|
||||||
|
assertNotEquals("round robin not working",
|
||||||
|
tmp1.getParent(), tmp2.getParent());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue