From ccb1cade5b5f8421b907cafe147a4d610fb5fff4 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Tue, 17 May 2016 18:00:18 -0700 Subject: [PATCH] HDFS-10383. Safely close resources in DFSTestUtil. Contributed by Mingliang Liu. --- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 230 ++++++++---------- 1 file changed, 98 insertions(+), 132 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 781dcf4611a..5a2f524b4eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; @@ -333,17 +334,10 @@ public static String readFile(FileSystem fs, Path fileName) public static byte[] readFileBuffer(FileSystem fs, Path fileName) throws IOException { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - try { - FSDataInputStream in = fs.open(fileName); - try { - IOUtils.copyBytes(in, os, 1024, true); - return os.toByteArray(); - } finally { - in.close(); - } - } finally { - os.close(); + try (ByteArrayOutputStream os = new ByteArrayOutputStream(); + FSDataInputStream in = fs.open(fileName)) { + IOUtils.copyBytes(in, os, 1024, true); + return os.toByteArray(); } } @@ -353,9 +347,7 @@ public static void createFile(FileSystem fs, Path fileName, long fileLen, throw new IOException("Mkdirs failed to create " + fileName.getParent().toString()); } - FSDataOutputStream out = null; - try { - out = fs.create(fileName, replFactor); + try (FSDataOutputStream out = fs.create(fileName, replFactor)) { byte[] toWrite = new byte[1024]; Random rb = new Random(seed); long bytesToWrite = fileLen; @@ -366,10 +358,6 @@ public static void createFile(FileSystem fs, Path fileName, long fileLen, out.write(toWrite, 0, bytesToWriteNext); bytesToWrite -= bytesToWriteNext; } - out.close(); - out = null; - } finally { - IOUtils.closeStream(out); } } @@ -391,51 +379,40 @@ public static void createFile(FileSystem fs, Path fileName, boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, short replFactor, long seed, boolean flush, InetSocketAddress[] favoredNodes) throws IOException { - assert bufferLen > 0; - if (!fs.mkdirs(fileName.getParent())) { + assert bufferLen > 0; + if (!fs.mkdirs(fileName.getParent())) { throw new IOException("Mkdirs failed to create " + - fileName.getParent().toString()); - } - FSDataOutputStream out = null; - EnumSet createFlags = EnumSet.of(CREATE); - createFlags.add(OVERWRITE); - if (isLazyPersist) { - createFlags.add(LAZY_PERSIST); - } - try { - if (favoredNodes == null) { - out = fs.create( - fileName, - FsPermission.getFileDefault(), - createFlags, - fs.getConf().getInt( - CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), - replFactor, blockSize, null); - } else { - out = ((DistributedFileSystem) fs).create(fileName, - FsPermission.getDefault(), true, bufferLen, replFactor, blockSize, - null, favoredNodes); + fileName.getParent().toString()); } + EnumSet createFlags = EnumSet.of(CREATE); + createFlags.add(OVERWRITE); + if (isLazyPersist) { + createFlags.add(LAZY_PERSIST); + } + try (FSDataOutputStream out = (favoredNodes == null) ? + fs.create(fileName, FsPermission.getFileDefault(), createFlags, + fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), replFactor, + blockSize, null) + : + ((DistributedFileSystem) fs).create(fileName, FsPermission.getDefault(), + true, bufferLen, replFactor, blockSize, null, favoredNodes) + ) { if (fileLen > 0) { byte[] toWrite = new byte[bufferLen]; Random rb = new Random(seed); long bytesToWrite = fileLen; - while (bytesToWrite>0) { + while (bytesToWrite > 0) { rb.nextBytes(toWrite); int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen - : (int) bytesToWrite; + : (int) bytesToWrite; - out.write(toWrite, 0, bytesToWriteNext); - bytesToWrite -= bytesToWriteNext; + out.write(toWrite, 0, bytesToWriteNext); + bytesToWrite -= bytesToWriteNext; } if (flush) { out.hsync(); } } - } finally { - if (out != null) { - out.close(); - } } } @@ -452,20 +429,18 @@ public boolean checkFiles(FileSystem fs, String topdir) throws IOException { for (int idx = 0; idx < nFiles; idx++) { Path fPath = new Path(root, files[idx].getName()); - FSDataInputStream in = fs.open(fPath); - byte[] toRead = new byte[files[idx].getSize()]; - byte[] toCompare = new byte[files[idx].getSize()]; - Random rb = new Random(files[idx].getSeed()); - rb.nextBytes(toCompare); - in.readFully(0, toRead); - in.close(); - for (int i = 0; i < toRead.length; i++) { - if (toRead[i] != toCompare[i]) { - return false; + try (FSDataInputStream in = fs.open(fPath)) { + byte[] toRead = new byte[files[idx].getSize()]; + byte[] toCompare = new byte[files[idx].getSize()]; + Random rb = new Random(files[idx].getSeed()); + rb.nextBytes(toCompare); + in.readFully(0, toRead); + for (int i = 0; i < toRead.length; i++) { + if (toRead[i] != toCompare[i]) { + return false; + } } } - toRead = null; - toCompare = null; } return true; @@ -499,16 +474,13 @@ public void waitReplication(FileSystem fs, String topdir, short value) */ public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster, Path file, int blockNo) throws IOException { - DFSClient client = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), cluster.getConfiguration(0)); - LocatedBlocks blocks; - try { - blocks = client.getNamenode().getBlockLocations( + try (DFSClient client = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), cluster.getConfiguration(0))) { + LocatedBlocks blocks; + blocks = client.getNamenode().getBlockLocations( file.toString(), 0, Long.MAX_VALUE); - } finally { - client.close(); + return blocks.get(blockNo).isCorrupt(); } - return blocks.get(blockNo).isCorrupt(); } /* @@ -788,12 +760,9 @@ public void cleanup(FileSystem fs, String topdir) throws IOException { } public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException { - HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path); - try { + try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path)) { in.readByte(); return in.getCurrentBlock(); - } finally { - in.close(); } } @@ -804,8 +773,9 @@ public static List getAllBlocks(FSDataInputStream in) public static List getAllBlocks(FileSystem fs, Path path) throws IOException { - HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path); - return in.getAllBlocks(); + try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path)) { + return in.getAllBlocks(); + } } public static Token getBlockToken( @@ -814,11 +784,14 @@ public static Token getBlockToken( } public static String readFile(File f) throws IOException { - StringBuilder b = new StringBuilder(); - BufferedReader in = new BufferedReader(new FileReader(f)); - for(int c; (c = in.read()) != -1; b.append((char)c)); - in.close(); - return b.toString(); + try (BufferedReader in = new BufferedReader(new FileReader(f))) { + StringBuilder b = new StringBuilder(); + int c; + while ((c = in.read()) != -1) { + b.append((char) c); + } + return b.toString(); + } } /* Write the given string to the given file */ @@ -827,18 +800,20 @@ public static void writeFile(FileSystem fs, Path p, String s) if (fs.exists(p)) { fs.delete(p, true); } - InputStream is = new ByteArrayInputStream(s.getBytes()); - FSDataOutputStream os = fs.create(p); - IOUtils.copyBytes(is, os, s.length(), true); + try (InputStream is = new ByteArrayInputStream(s.getBytes()); + FSDataOutputStream os = fs.create(p)) { + IOUtils.copyBytes(is, os, s.length()); + } } /* Append the given string to the given file */ public static void appendFile(FileSystem fs, Path p, String s) throws IOException { assert fs.exists(p); - InputStream is = new ByteArrayInputStream(s.getBytes()); - FSDataOutputStream os = fs.append(p); - IOUtils.copyBytes(is, os, s.length(), true); + try (InputStream is = new ByteArrayInputStream(s.getBytes()); + FSDataOutputStream os = fs.append(p)) { + IOUtils.copyBytes(is, os, s.length()); + } } /** @@ -855,9 +830,9 @@ public static void appendFile(FileSystem fs, Path p, int length) byte[] toAppend = new byte[length]; Random random = new Random(); random.nextBytes(toAppend); - FSDataOutputStream out = fs.append(p); - out.write(toAppend); - out.close(); + try (FSDataOutputStream out = fs.append(p)) { + out.write(toAppend); + } } /** @@ -975,35 +950,32 @@ public static Statistics getStatistics(FileSystem fs) { */ public static byte[] loadFile(String filename) throws IOException { File file = new File(filename); - DataInputStream in = new DataInputStream(new FileInputStream(file)); - byte[] content = new byte[(int)file.length()]; - try { + try (DataInputStream in = new DataInputStream(new FileInputStream(file))) { + byte[] content = new byte[(int) file.length()]; in.readFully(content); - } finally { - IOUtils.cleanup(LOG, in); + return content; } - return content; } /** For {@link TestTransferRbw} */ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { assertEquals(2, datanodes.length); - final Socket s = DataStreamer.createSocketForPipeline(datanodes[0], - datanodes.length, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); - final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(s, writeTimeout), - DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration()))); - final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); + try (Socket s = DataStreamer.createSocketForPipeline(datanodes[0], + datanodes.length, dfsClient); + DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + NetUtils.getOutputStream(s, writeTimeout), + DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration()))); + DataInputStream in = new DataInputStream(NetUtils.getInputStream(s))) { + // send the request + new Sender(out).transferBlock(b, new Token(), + dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, + new StorageType[]{StorageType.DEFAULT}); + out.flush(); - // send the request - new Sender(out).transferBlock(b, new Token(), - dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, - new StorageType[]{StorageType.DEFAULT}); - out.flush(); - - return BlockOpResponseProto.parseDelimitedFrom(in); + return BlockOpResponseProto.parseDelimitedFrom(in); + } } public static void setFederatedConfiguration(MiniDFSCluster cluster, @@ -1551,13 +1523,12 @@ public void close() throws IOException { */ public static void verifyFilesEqual(FileSystem fs, Path p1, Path p2, int len) throws IOException { - final FSDataInputStream in1 = fs.open(p1); - final FSDataInputStream in2 = fs.open(p2); - for (int i = 0; i < len; i++) { - assertEquals("Mismatch at byte " + i, in1.read(), in2.read()); + try (FSDataInputStream in1 = fs.open(p1); + FSDataInputStream in2 = fs.open(p2)) { + for (int i = 0; i < len; i++) { + assertEquals("Mismatch at byte " + i, in1.read(), in2.read()); + } } - in1.close(); - in2.close(); } /** @@ -1570,20 +1541,15 @@ public static void verifyFilesEqual(FileSystem fs, Path p1, Path p2, int len) * @throws IOException */ public static void verifyFilesNotEqual(FileSystem fs, Path p1, Path p2, - int len) - throws IOException { - final FSDataInputStream in1 = fs.open(p1); - final FSDataInputStream in2 = fs.open(p2); - try { + int len) throws IOException { + try (FSDataInputStream in1 = fs.open(p1); + FSDataInputStream in2 = fs.open(p2)) { for (int i = 0; i < len; i++) { if (in1.read() != in2.read()) { return; } } fail("files are equal, but should not be"); - } finally { - in1.close(); - in2.close(); } } @@ -1694,13 +1660,13 @@ public static void toolRun(Tool tool, String cmd, int retcode, String contain) int ret = 0; try { ByteArrayOutputStream bs = new ByteArrayOutputStream(1024); - PrintStream out = new PrintStream(bs); - System.setOut(out); - System.setErr(out); - ret = tool.run(cmds); - System.out.flush(); - System.err.flush(); - out.close(); + try (PrintStream out = new PrintStream(bs)) { + System.setOut(out); + System.setErr(out); + ret = tool.run(cmds); + System.out.flush(); + System.err.flush(); + } output = bs.toString(); } finally { System.setOut(origOut); @@ -1802,9 +1768,9 @@ public static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException { File blockFile = cluster.getBlockFile(dnIndex, blk); if (blockFile != null && blockFile.exists()) { - RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); - raFile.setLength(raFile.length()+lenDelta); - raFile.close(); + try (RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw")) { + raFile.setLength(raFile.length() + lenDelta); + } return true; } LOG.info("failed to change length of block " + blk);