HDFS-10383. Safely close resources in DFSTestUtil. Contributed by Mingliang Liu.
This commit is contained in:
parent
2bcf1eb36a
commit
ccb1cade5b
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
|
@ -333,17 +334,10 @@ public class DFSTestUtil {
|
||||||
|
|
||||||
public static byte[] readFileBuffer(FileSystem fs, Path fileName)
|
public static byte[] readFileBuffer(FileSystem fs, Path fileName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
try (ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||||
try {
|
FSDataInputStream in = fs.open(fileName)) {
|
||||||
FSDataInputStream in = fs.open(fileName);
|
IOUtils.copyBytes(in, os, 1024, true);
|
||||||
try {
|
return os.toByteArray();
|
||||||
IOUtils.copyBytes(in, os, 1024, true);
|
|
||||||
return os.toByteArray();
|
|
||||||
} finally {
|
|
||||||
in.close();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
os.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -353,9 +347,7 @@ public class DFSTestUtil {
|
||||||
throw new IOException("Mkdirs failed to create " +
|
throw new IOException("Mkdirs failed to create " +
|
||||||
fileName.getParent().toString());
|
fileName.getParent().toString());
|
||||||
}
|
}
|
||||||
FSDataOutputStream out = null;
|
try (FSDataOutputStream out = fs.create(fileName, replFactor)) {
|
||||||
try {
|
|
||||||
out = fs.create(fileName, replFactor);
|
|
||||||
byte[] toWrite = new byte[1024];
|
byte[] toWrite = new byte[1024];
|
||||||
Random rb = new Random(seed);
|
Random rb = new Random(seed);
|
||||||
long bytesToWrite = fileLen;
|
long bytesToWrite = fileLen;
|
||||||
|
@ -366,10 +358,6 @@ public class DFSTestUtil {
|
||||||
out.write(toWrite, 0, bytesToWriteNext);
|
out.write(toWrite, 0, bytesToWriteNext);
|
||||||
bytesToWrite -= bytesToWriteNext;
|
bytesToWrite -= bytesToWriteNext;
|
||||||
}
|
}
|
||||||
out.close();
|
|
||||||
out = null;
|
|
||||||
} finally {
|
|
||||||
IOUtils.closeStream(out);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -391,51 +379,40 @@ public class DFSTestUtil {
|
||||||
boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
|
boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
|
||||||
short replFactor, long seed, boolean flush,
|
short replFactor, long seed, boolean flush,
|
||||||
InetSocketAddress[] favoredNodes) throws IOException {
|
InetSocketAddress[] favoredNodes) throws IOException {
|
||||||
assert bufferLen > 0;
|
assert bufferLen > 0;
|
||||||
if (!fs.mkdirs(fileName.getParent())) {
|
if (!fs.mkdirs(fileName.getParent())) {
|
||||||
throw new IOException("Mkdirs failed to create " +
|
throw new IOException("Mkdirs failed to create " +
|
||||||
fileName.getParent().toString());
|
fileName.getParent().toString());
|
||||||
}
|
|
||||||
FSDataOutputStream out = null;
|
|
||||||
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
|
|
||||||
createFlags.add(OVERWRITE);
|
|
||||||
if (isLazyPersist) {
|
|
||||||
createFlags.add(LAZY_PERSIST);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (favoredNodes == null) {
|
|
||||||
out = fs.create(
|
|
||||||
fileName,
|
|
||||||
FsPermission.getFileDefault(),
|
|
||||||
createFlags,
|
|
||||||
fs.getConf().getInt(
|
|
||||||
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
|
|
||||||
replFactor, blockSize, null);
|
|
||||||
} else {
|
|
||||||
out = ((DistributedFileSystem) fs).create(fileName,
|
|
||||||
FsPermission.getDefault(), true, bufferLen, replFactor, blockSize,
|
|
||||||
null, favoredNodes);
|
|
||||||
}
|
}
|
||||||
|
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
|
||||||
|
createFlags.add(OVERWRITE);
|
||||||
|
if (isLazyPersist) {
|
||||||
|
createFlags.add(LAZY_PERSIST);
|
||||||
|
}
|
||||||
|
try (FSDataOutputStream out = (favoredNodes == null) ?
|
||||||
|
fs.create(fileName, FsPermission.getFileDefault(), createFlags,
|
||||||
|
fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), replFactor,
|
||||||
|
blockSize, null)
|
||||||
|
:
|
||||||
|
((DistributedFileSystem) fs).create(fileName, FsPermission.getDefault(),
|
||||||
|
true, bufferLen, replFactor, blockSize, null, favoredNodes)
|
||||||
|
) {
|
||||||
if (fileLen > 0) {
|
if (fileLen > 0) {
|
||||||
byte[] toWrite = new byte[bufferLen];
|
byte[] toWrite = new byte[bufferLen];
|
||||||
Random rb = new Random(seed);
|
Random rb = new Random(seed);
|
||||||
long bytesToWrite = fileLen;
|
long bytesToWrite = fileLen;
|
||||||
while (bytesToWrite>0) {
|
while (bytesToWrite > 0) {
|
||||||
rb.nextBytes(toWrite);
|
rb.nextBytes(toWrite);
|
||||||
int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
|
int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
|
||||||
: (int) bytesToWrite;
|
: (int) bytesToWrite;
|
||||||
|
|
||||||
out.write(toWrite, 0, bytesToWriteNext);
|
out.write(toWrite, 0, bytesToWriteNext);
|
||||||
bytesToWrite -= bytesToWriteNext;
|
bytesToWrite -= bytesToWriteNext;
|
||||||
}
|
}
|
||||||
if (flush) {
|
if (flush) {
|
||||||
out.hsync();
|
out.hsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
if (out != null) {
|
|
||||||
out.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,20 +429,18 @@ public class DFSTestUtil {
|
||||||
|
|
||||||
for (int idx = 0; idx < nFiles; idx++) {
|
for (int idx = 0; idx < nFiles; idx++) {
|
||||||
Path fPath = new Path(root, files[idx].getName());
|
Path fPath = new Path(root, files[idx].getName());
|
||||||
FSDataInputStream in = fs.open(fPath);
|
try (FSDataInputStream in = fs.open(fPath)) {
|
||||||
byte[] toRead = new byte[files[idx].getSize()];
|
byte[] toRead = new byte[files[idx].getSize()];
|
||||||
byte[] toCompare = new byte[files[idx].getSize()];
|
byte[] toCompare = new byte[files[idx].getSize()];
|
||||||
Random rb = new Random(files[idx].getSeed());
|
Random rb = new Random(files[idx].getSeed());
|
||||||
rb.nextBytes(toCompare);
|
rb.nextBytes(toCompare);
|
||||||
in.readFully(0, toRead);
|
in.readFully(0, toRead);
|
||||||
in.close();
|
for (int i = 0; i < toRead.length; i++) {
|
||||||
for (int i = 0; i < toRead.length; i++) {
|
if (toRead[i] != toCompare[i]) {
|
||||||
if (toRead[i] != toCompare[i]) {
|
return false;
|
||||||
return false;
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
toRead = null;
|
|
||||||
toCompare = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -499,16 +474,13 @@ public class DFSTestUtil {
|
||||||
*/
|
*/
|
||||||
public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
|
public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
|
||||||
Path file, int blockNo) throws IOException {
|
Path file, int blockNo) throws IOException {
|
||||||
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
|
try (DFSClient client = new DFSClient(new InetSocketAddress("localhost",
|
||||||
cluster.getNameNodePort()), cluster.getConfiguration(0));
|
cluster.getNameNodePort()), cluster.getConfiguration(0))) {
|
||||||
LocatedBlocks blocks;
|
LocatedBlocks blocks;
|
||||||
try {
|
blocks = client.getNamenode().getBlockLocations(
|
||||||
blocks = client.getNamenode().getBlockLocations(
|
|
||||||
file.toString(), 0, Long.MAX_VALUE);
|
file.toString(), 0, Long.MAX_VALUE);
|
||||||
} finally {
|
return blocks.get(blockNo).isCorrupt();
|
||||||
client.close();
|
|
||||||
}
|
}
|
||||||
return blocks.get(blockNo).isCorrupt();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -788,12 +760,9 @@ public class DFSTestUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
|
public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
|
||||||
HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
|
try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path)) {
|
||||||
try {
|
|
||||||
in.readByte();
|
in.readByte();
|
||||||
return in.getCurrentBlock();
|
return in.getCurrentBlock();
|
||||||
} finally {
|
|
||||||
in.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -804,8 +773,9 @@ public class DFSTestUtil {
|
||||||
|
|
||||||
public static List<LocatedBlock> getAllBlocks(FileSystem fs, Path path)
|
public static List<LocatedBlock> getAllBlocks(FileSystem fs, Path path)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
|
try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path)) {
|
||||||
return in.getAllBlocks();
|
return in.getAllBlocks();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Token<BlockTokenIdentifier> getBlockToken(
|
public static Token<BlockTokenIdentifier> getBlockToken(
|
||||||
|
@ -814,11 +784,14 @@ public class DFSTestUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String readFile(File f) throws IOException {
|
public static String readFile(File f) throws IOException {
|
||||||
StringBuilder b = new StringBuilder();
|
try (BufferedReader in = new BufferedReader(new FileReader(f))) {
|
||||||
BufferedReader in = new BufferedReader(new FileReader(f));
|
StringBuilder b = new StringBuilder();
|
||||||
for(int c; (c = in.read()) != -1; b.append((char)c));
|
int c;
|
||||||
in.close();
|
while ((c = in.read()) != -1) {
|
||||||
return b.toString();
|
b.append((char) c);
|
||||||
|
}
|
||||||
|
return b.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Write the given string to the given file */
|
/* Write the given string to the given file */
|
||||||
|
@ -827,18 +800,20 @@ public class DFSTestUtil {
|
||||||
if (fs.exists(p)) {
|
if (fs.exists(p)) {
|
||||||
fs.delete(p, true);
|
fs.delete(p, true);
|
||||||
}
|
}
|
||||||
InputStream is = new ByteArrayInputStream(s.getBytes());
|
try (InputStream is = new ByteArrayInputStream(s.getBytes());
|
||||||
FSDataOutputStream os = fs.create(p);
|
FSDataOutputStream os = fs.create(p)) {
|
||||||
IOUtils.copyBytes(is, os, s.length(), true);
|
IOUtils.copyBytes(is, os, s.length());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Append the given string to the given file */
|
/* Append the given string to the given file */
|
||||||
public static void appendFile(FileSystem fs, Path p, String s)
|
public static void appendFile(FileSystem fs, Path p, String s)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assert fs.exists(p);
|
assert fs.exists(p);
|
||||||
InputStream is = new ByteArrayInputStream(s.getBytes());
|
try (InputStream is = new ByteArrayInputStream(s.getBytes());
|
||||||
FSDataOutputStream os = fs.append(p);
|
FSDataOutputStream os = fs.append(p)) {
|
||||||
IOUtils.copyBytes(is, os, s.length(), true);
|
IOUtils.copyBytes(is, os, s.length());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -855,9 +830,9 @@ public class DFSTestUtil {
|
||||||
byte[] toAppend = new byte[length];
|
byte[] toAppend = new byte[length];
|
||||||
Random random = new Random();
|
Random random = new Random();
|
||||||
random.nextBytes(toAppend);
|
random.nextBytes(toAppend);
|
||||||
FSDataOutputStream out = fs.append(p);
|
try (FSDataOutputStream out = fs.append(p)) {
|
||||||
out.write(toAppend);
|
out.write(toAppend);
|
||||||
out.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -975,35 +950,32 @@ public class DFSTestUtil {
|
||||||
*/
|
*/
|
||||||
public static byte[] loadFile(String filename) throws IOException {
|
public static byte[] loadFile(String filename) throws IOException {
|
||||||
File file = new File(filename);
|
File file = new File(filename);
|
||||||
DataInputStream in = new DataInputStream(new FileInputStream(file));
|
try (DataInputStream in = new DataInputStream(new FileInputStream(file))) {
|
||||||
byte[] content = new byte[(int)file.length()];
|
byte[] content = new byte[(int) file.length()];
|
||||||
try {
|
|
||||||
in.readFully(content);
|
in.readFully(content);
|
||||||
} finally {
|
return content;
|
||||||
IOUtils.cleanup(LOG, in);
|
|
||||||
}
|
}
|
||||||
return content;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** For {@link TestTransferRbw} */
|
/** For {@link TestTransferRbw} */
|
||||||
public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
|
public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
|
||||||
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
|
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
|
||||||
assertEquals(2, datanodes.length);
|
assertEquals(2, datanodes.length);
|
||||||
final Socket s = DataStreamer.createSocketForPipeline(datanodes[0],
|
|
||||||
datanodes.length, dfsClient);
|
|
||||||
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
|
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
|
||||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
try (Socket s = DataStreamer.createSocketForPipeline(datanodes[0],
|
||||||
NetUtils.getOutputStream(s, writeTimeout),
|
datanodes.length, dfsClient);
|
||||||
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
|
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||||
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
|
NetUtils.getOutputStream(s, writeTimeout),
|
||||||
|
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
|
||||||
|
DataInputStream in = new DataInputStream(NetUtils.getInputStream(s))) {
|
||||||
|
// send the request
|
||||||
|
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
|
||||||
|
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
|
||||||
|
new StorageType[]{StorageType.DEFAULT});
|
||||||
|
out.flush();
|
||||||
|
|
||||||
// send the request
|
return BlockOpResponseProto.parseDelimitedFrom(in);
|
||||||
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
|
}
|
||||||
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
|
|
||||||
new StorageType[]{StorageType.DEFAULT});
|
|
||||||
out.flush();
|
|
||||||
|
|
||||||
return BlockOpResponseProto.parseDelimitedFrom(in);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void setFederatedConfiguration(MiniDFSCluster cluster,
|
public static void setFederatedConfiguration(MiniDFSCluster cluster,
|
||||||
|
@ -1551,13 +1523,12 @@ public class DFSTestUtil {
|
||||||
*/
|
*/
|
||||||
public static void verifyFilesEqual(FileSystem fs, Path p1, Path p2, int len)
|
public static void verifyFilesEqual(FileSystem fs, Path p1, Path p2, int len)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final FSDataInputStream in1 = fs.open(p1);
|
try (FSDataInputStream in1 = fs.open(p1);
|
||||||
final FSDataInputStream in2 = fs.open(p2);
|
FSDataInputStream in2 = fs.open(p2)) {
|
||||||
for (int i = 0; i < len; i++) {
|
for (int i = 0; i < len; i++) {
|
||||||
assertEquals("Mismatch at byte " + i, in1.read(), in2.read());
|
assertEquals("Mismatch at byte " + i, in1.read(), in2.read());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
in1.close();
|
|
||||||
in2.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1570,20 +1541,15 @@ public class DFSTestUtil {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static void verifyFilesNotEqual(FileSystem fs, Path p1, Path p2,
|
public static void verifyFilesNotEqual(FileSystem fs, Path p1, Path p2,
|
||||||
int len)
|
int len) throws IOException {
|
||||||
throws IOException {
|
try (FSDataInputStream in1 = fs.open(p1);
|
||||||
final FSDataInputStream in1 = fs.open(p1);
|
FSDataInputStream in2 = fs.open(p2)) {
|
||||||
final FSDataInputStream in2 = fs.open(p2);
|
|
||||||
try {
|
|
||||||
for (int i = 0; i < len; i++) {
|
for (int i = 0; i < len; i++) {
|
||||||
if (in1.read() != in2.read()) {
|
if (in1.read() != in2.read()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fail("files are equal, but should not be");
|
fail("files are equal, but should not be");
|
||||||
} finally {
|
|
||||||
in1.close();
|
|
||||||
in2.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1694,13 +1660,13 @@ public class DFSTestUtil {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
try {
|
try {
|
||||||
ByteArrayOutputStream bs = new ByteArrayOutputStream(1024);
|
ByteArrayOutputStream bs = new ByteArrayOutputStream(1024);
|
||||||
PrintStream out = new PrintStream(bs);
|
try (PrintStream out = new PrintStream(bs)) {
|
||||||
System.setOut(out);
|
System.setOut(out);
|
||||||
System.setErr(out);
|
System.setErr(out);
|
||||||
ret = tool.run(cmds);
|
ret = tool.run(cmds);
|
||||||
System.out.flush();
|
System.out.flush();
|
||||||
System.err.flush();
|
System.err.flush();
|
||||||
out.close();
|
}
|
||||||
output = bs.toString();
|
output = bs.toString();
|
||||||
} finally {
|
} finally {
|
||||||
System.setOut(origOut);
|
System.setOut(origOut);
|
||||||
|
@ -1802,9 +1768,9 @@ public class DFSTestUtil {
|
||||||
ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
|
ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
|
||||||
File blockFile = cluster.getBlockFile(dnIndex, blk);
|
File blockFile = cluster.getBlockFile(dnIndex, blk);
|
||||||
if (blockFile != null && blockFile.exists()) {
|
if (blockFile != null && blockFile.exists()) {
|
||||||
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
|
try (RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw")) {
|
||||||
raFile.setLength(raFile.length()+lenDelta);
|
raFile.setLength(raFile.length() + lenDelta);
|
||||||
raFile.close();
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
LOG.info("failed to change length of block " + blk);
|
LOG.info("failed to change length of block " + blk);
|
||||||
|
|
Loading…
Reference in New Issue