HDFS-10383. Safely close resources in DFSTestUtil. Contributed by Mingliang Liu.

This commit is contained in:
Xiaoyu Yao 2016-05-17 16:12:00 -07:00
parent 16c07cc68a
commit dd99f5fc79
1 changed files with 123 additions and 149 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
@ -147,6 +148,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.tools.JMXGet; import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -314,10 +316,11 @@ public class DFSTestUtil {
} }
public static byte[] readFileAsBytes(FileSystem fs, Path fileName) throws IOException { public static byte[] readFileAsBytes(FileSystem fs, Path fileName) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream(); try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
IOUtils.copyBytes(fs.open(fileName), os, 1024, true); IOUtils.copyBytes(fs.open(fileName), os, 1024);
return os.toByteArray(); return os.toByteArray();
} }
}
/** create nFiles with random names and directory hierarchies /** create nFiles with random names and directory hierarchies
* with random (but reproducible) data in them. * with random (but reproducible) data in them.
@ -346,17 +349,10 @@ public class DFSTestUtil {
public static byte[] readFileBuffer(FileSystem fs, Path fileName) public static byte[] readFileBuffer(FileSystem fs, Path fileName)
throws IOException { throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream(); try (ByteArrayOutputStream os = new ByteArrayOutputStream();
try { FSDataInputStream in = fs.open(fileName)) {
FSDataInputStream in = fs.open(fileName);
try {
IOUtils.copyBytes(in, os, 1024, true); IOUtils.copyBytes(in, os, 1024, true);
return os.toByteArray(); return os.toByteArray();
} finally {
in.close();
}
} finally {
os.close();
} }
} }
@ -389,26 +385,19 @@ public class DFSTestUtil {
throw new IOException("Mkdirs failed to create " + throw new IOException("Mkdirs failed to create " +
fileName.getParent().toString()); fileName.getParent().toString());
} }
FSDataOutputStream out = null;
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
createFlags.add(OVERWRITE); createFlags.add(OVERWRITE);
if (isLazyPersist) { if (isLazyPersist) {
createFlags.add(LAZY_PERSIST); createFlags.add(LAZY_PERSIST);
} }
try { try (FSDataOutputStream out = (favoredNodes == null) ?
if (favoredNodes == null) { fs.create(fileName, FsPermission.getFileDefault(), createFlags,
out = fs.create( fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), replFactor,
fileName, blockSize, null)
FsPermission.getFileDefault(), :
createFlags, ((DistributedFileSystem) fs).create(fileName, FsPermission.getDefault(),
fs.getConf().getInt( true, bufferLen, replFactor, blockSize, null, favoredNodes)
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), ) {
replFactor, blockSize, null);
} else {
out = ((DistributedFileSystem) fs).create(fileName,
FsPermission.getDefault(), true, bufferLen, replFactor, blockSize,
null, favoredNodes);
}
if (fileLen > 0) { if (fileLen > 0) {
byte[] toWrite = new byte[bufferLen]; byte[] toWrite = new byte[bufferLen];
Random rb = new Random(seed); Random rb = new Random(seed);
@ -425,10 +414,6 @@ public class DFSTestUtil {
out.hsync(); out.hsync();
} }
} }
} finally {
if (out != null) {
out.close();
}
} }
} }
@ -445,20 +430,18 @@ public class DFSTestUtil {
for (int idx = 0; idx < nFiles; idx++) { for (int idx = 0; idx < nFiles; idx++) {
Path fPath = new Path(root, files[idx].getName()); Path fPath = new Path(root, files[idx].getName());
FSDataInputStream in = fs.open(fPath); try (FSDataInputStream in = fs.open(fPath)) {
byte[] toRead = new byte[files[idx].getSize()]; byte[] toRead = new byte[files[idx].getSize()];
byte[] toCompare = new byte[files[idx].getSize()]; byte[] toCompare = new byte[files[idx].getSize()];
Random rb = new Random(files[idx].getSeed()); Random rb = new Random(files[idx].getSeed());
rb.nextBytes(toCompare); rb.nextBytes(toCompare);
in.readFully(0, toRead); in.readFully(0, toRead);
in.close();
for (int i = 0; i < toRead.length; i++) { for (int i = 0; i < toRead.length; i++) {
if (toRead[i] != toCompare[i]) { if (toRead[i] != toCompare[i]) {
return false; return false;
} }
} }
toRead = null; }
toCompare = null;
} }
return true; return true;
@ -492,17 +475,14 @@ public class DFSTestUtil {
*/ */
public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster, public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
Path file, int blockNo) throws IOException { Path file, int blockNo) throws IOException {
DFSClient client = new DFSClient(new InetSocketAddress("localhost", try (DFSClient client = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), cluster.getConfiguration(0)); cluster.getNameNodePort()), cluster.getConfiguration(0))) {
LocatedBlocks blocks; LocatedBlocks blocks;
try {
blocks = client.getNamenode().getBlockLocations( blocks = client.getNamenode().getBlockLocations(
file.toString(), 0, Long.MAX_VALUE); file.toString(), 0, Long.MAX_VALUE);
} finally {
client.close();
}
return blocks.get(blockNo).isCorrupt(); return blocks.get(blockNo).isCorrupt();
} }
}
/* /*
* Wait up to 20s for the given block to be replicated across * Wait up to 20s for the given block to be replicated across
@ -781,12 +761,9 @@ public class DFSTestUtil {
} }
public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException { public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path); try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path)) {
try {
in.readByte(); in.readByte();
return in.getCurrentBlock(); return in.getCurrentBlock();
} finally {
in.close();
} }
} }
@ -797,9 +774,10 @@ public class DFSTestUtil {
public static List<LocatedBlock> getAllBlocks(FileSystem fs, Path path) public static List<LocatedBlock> getAllBlocks(FileSystem fs, Path path)
throws IOException { throws IOException {
HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path); try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path)) {
return in.getAllBlocks(); return in.getAllBlocks();
} }
}
public static Token<BlockTokenIdentifier> getBlockToken( public static Token<BlockTokenIdentifier> getBlockToken(
FSDataOutputStream out) { FSDataOutputStream out) {
@ -807,18 +785,22 @@ public class DFSTestUtil {
} }
public static String readFile(File f) throws IOException { public static String readFile(File f) throws IOException {
try (BufferedReader in = new BufferedReader(new FileReader(f))) {
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();
BufferedReader in = new BufferedReader(new FileReader(f)); int c;
for(int c; (c = in.read()) != -1; b.append((char)c)); while ((c = in.read()) != -1) {
in.close(); b.append((char) c);
}
return b.toString(); return b.toString();
} }
}
public static byte[] readFileAsBytes(File f) throws IOException { public static byte[] readFileAsBytes(File f) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream(); try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
IOUtils.copyBytes(new FileInputStream(f), os, 1024, true); IOUtils.copyBytes(new FileInputStream(f), os, 1024);
return os.toByteArray(); return os.toByteArray();
} }
}
/* Write the given bytes to the given file */ /* Write the given bytes to the given file */
public static void writeFile(FileSystem fs, Path p, byte[] bytes) public static void writeFile(FileSystem fs, Path p, byte[] bytes)
@ -826,9 +808,10 @@ public class DFSTestUtil {
if (fs.exists(p)) { if (fs.exists(p)) {
fs.delete(p, true); fs.delete(p, true);
} }
InputStream is = new ByteArrayInputStream(bytes); try (InputStream is = new ByteArrayInputStream(bytes);
FSDataOutputStream os = fs.create(p); FSDataOutputStream os = fs.create(p)) {
IOUtils.copyBytes(is, os, bytes.length, true); IOUtils.copyBytes(is, os, bytes.length);
}
} }
/* Write the given string to the given file */ /* Write the given string to the given file */
@ -841,9 +824,10 @@ public class DFSTestUtil {
public static void appendFile(FileSystem fs, Path p, String s) public static void appendFile(FileSystem fs, Path p, String s)
throws IOException { throws IOException {
assert fs.exists(p); assert fs.exists(p);
InputStream is = new ByteArrayInputStream(s.getBytes()); try (InputStream is = new ByteArrayInputStream(s.getBytes());
FSDataOutputStream os = fs.append(p); FSDataOutputStream os = fs.append(p)) {
IOUtils.copyBytes(is, os, s.length(), true); IOUtils.copyBytes(is, os, s.length());
}
} }
/** /**
@ -860,9 +844,9 @@ public class DFSTestUtil {
byte[] toAppend = new byte[length]; byte[] toAppend = new byte[length];
Random random = new Random(); Random random = new Random();
random.nextBytes(toAppend); random.nextBytes(toAppend);
FSDataOutputStream out = fs.append(p); try (FSDataOutputStream out = fs.append(p)) {
out.write(toAppend); out.write(toAppend);
out.close(); }
} }
/** /**
@ -980,28 +964,24 @@ public class DFSTestUtil {
*/ */
public static byte[] loadFile(String filename) throws IOException { public static byte[] loadFile(String filename) throws IOException {
File file = new File(filename); File file = new File(filename);
DataInputStream in = new DataInputStream(new FileInputStream(file)); try (DataInputStream in = new DataInputStream(new FileInputStream(file))) {
byte[] content = new byte[(int) file.length()]; byte[] content = new byte[(int) file.length()];
try {
in.readFully(content); in.readFully(content);
} finally {
IOUtils.cleanup(LOG, in);
}
return content; return content;
} }
}
/** For {@link TestTransferRbw} */ /** For {@link TestTransferRbw} */
public static BlockOpResponseProto transferRbw(final ExtendedBlock b, public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
assertEquals(2, datanodes.length); assertEquals(2, datanodes.length);
final Socket s = DataStreamer.createSocketForPipeline(datanodes[0],
datanodes.length, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( try (Socket s = DataStreamer.createSocketForPipeline(datanodes[0],
datanodes.length, dfsClient);
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(s, writeTimeout), NetUtils.getOutputStream(s, writeTimeout),
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration()))); DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); DataInputStream in = new DataInputStream(NetUtils.getInputStream(s))) {
// send the request // send the request
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
@ -1010,6 +990,7 @@ public class DFSTestUtil {
return BlockOpResponseProto.parseDelimitedFrom(in); return BlockOpResponseProto.parseDelimitedFrom(in);
} }
}
public static void setFederatedConfiguration(MiniDFSCluster cluster, public static void setFederatedConfiguration(MiniDFSCluster cluster,
Configuration conf) { Configuration conf) {
@ -1556,13 +1537,12 @@ public class DFSTestUtil {
*/ */
public static void verifyFilesEqual(FileSystem fs, Path p1, Path p2, int len) public static void verifyFilesEqual(FileSystem fs, Path p1, Path p2, int len)
throws IOException { throws IOException {
final FSDataInputStream in1 = fs.open(p1); try (FSDataInputStream in1 = fs.open(p1);
final FSDataInputStream in2 = fs.open(p2); FSDataInputStream in2 = fs.open(p2)) {
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
assertEquals("Mismatch at byte " + i, in1.read(), in2.read()); assertEquals("Mismatch at byte " + i, in1.read(), in2.read());
} }
in1.close(); }
in2.close();
} }
/** /**
@ -1575,20 +1555,15 @@ public class DFSTestUtil {
* @throws IOException * @throws IOException
*/ */
public static void verifyFilesNotEqual(FileSystem fs, Path p1, Path p2, public static void verifyFilesNotEqual(FileSystem fs, Path p1, Path p2,
int len) int len) throws IOException {
throws IOException { try (FSDataInputStream in1 = fs.open(p1);
final FSDataInputStream in1 = fs.open(p1); FSDataInputStream in2 = fs.open(p2)) {
final FSDataInputStream in2 = fs.open(p2);
try {
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
if (in1.read() != in2.read()) { if (in1.read() != in2.read()) {
return; return;
} }
} }
fail("files are equal, but should not be"); fail("files are equal, but should not be");
} finally {
in1.close();
in2.close();
} }
} }
@ -1699,13 +1674,13 @@ public class DFSTestUtil {
int ret = 0; int ret = 0;
try { try {
ByteArrayOutputStream bs = new ByteArrayOutputStream(1024); ByteArrayOutputStream bs = new ByteArrayOutputStream(1024);
PrintStream out = new PrintStream(bs); try (PrintStream out = new PrintStream(bs)) {
System.setOut(out); System.setOut(out);
System.setErr(out); System.setErr(out);
ret = tool.run(cmds); ret = tool.run(cmds);
System.out.flush(); System.out.flush();
System.err.flush(); System.err.flush();
out.close(); }
output = bs.toString(); output = bs.toString();
} finally { } finally {
System.setOut(origOut); System.setOut(origOut);
@ -1798,9 +1773,9 @@ public class DFSTestUtil {
ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException { ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
File blockFile = cluster.getBlockFile(dnIndex, blk); File blockFile = cluster.getBlockFile(dnIndex, blk);
if (blockFile != null && blockFile.exists()) { if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); try (RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw")) {
raFile.setLength(raFile.length() + lenDelta); raFile.setLength(raFile.length() + lenDelta);
raFile.close(); }
return true; return true;
} }
LOG.info("failed to change length of block " + blk); LOG.info("failed to change length of block " + blk);
@ -1925,9 +1900,11 @@ public class DFSTestUtil {
} }
} }
FSDataOutputStream out = null; cluster.getNameNodeRpc()
try { .create(file.toString(), new FsPermission((short)0755),
out = dfs.create(file, (short) 1); // create an empty file dfs.getClient().getClientName(),
new EnumSetWritable<>(EnumSet.of(CreateFlag.CREATE)),
false, (short)1, 128*1024*1024L, null);
FSNamesystem ns = cluster.getNamesystem(); FSNamesystem ns = cluster.getNamesystem();
FSDirectory fsdir = ns.getFSDirectory(); FSDirectory fsdir = ns.getFSDirectory();
@ -1943,9 +1920,6 @@ public class DFSTestUtil {
dfs.getClient().namenode.complete(file.toString(), dfs.getClient().namenode.complete(file.toString(),
dfs.getClient().getClientName(), previous, fileNode.getId()); dfs.getClient().getClientName(), previous, fileNode.getId());
} finally {
IOUtils.cleanup(null, out);
}
} }
/** /**