HDFS-11083. Add unit test for DFSAdmin -report command. Contributed by Xiaobing Zhou

(cherry picked from commit 62d8c17dfd)
This commit is contained in:
Mingliang Liu 2016-11-08 17:04:25 -08:00
parent 6685a0a064
commit f5c9f9f1b2
1 changed files with 140 additions and 0 deletions

View File

@ -26,13 +26,22 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@ -452,4 +461,135 @@ public class TestDFSAdmin {
assertThat(outs.get(offset + 2),
is(allOf(containsString("To:"), containsString("6"))));
}
private static String scanIntoString(final ByteArrayOutputStream baos) {
final StrBuilder sb = new StrBuilder();
final Scanner scanner = new Scanner(baos.toString());
while (scanner.hasNextLine()) {
sb.appendln(scanner.nextLine());
}
scanner.close();
return sb.toString();
}
@Test(timeout = 30000)
public void testReportCommand() throws Exception {
redirectStream();
/* init conf */
final Configuration dfsConf = new HdfsConfiguration();
dfsConf.setInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
500); // 0.5s
dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
final Path baseDir = new Path(
PathUtils.getTestDir(getClass()).getAbsolutePath(),
GenericTestUtils.getMethodName());
dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString());
final int numDn = 3;
/* init cluster */
try(MiniDFSCluster miniCluster = new MiniDFSCluster
.Builder(dfsConf)
.numDataNodes(numDn).build()) {
miniCluster.waitActive();
assertEquals(numDn, miniCluster.getDataNodes().size());
/* local vars */
final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);
final DFSClient client = miniCluster.getFileSystem().getClient();
/* run and verify report command */
resetStream();
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn, 0, client);
/* shut down one DN */
final List<DataNode> datanodes = miniCluster.getDataNodes();
final DataNode last = datanodes.get(datanodes.size() - 1);
last.shutdown();
miniCluster.setDataNodeDead(last.getDatanodeId());
/* run and verify report command */
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn - 1, 0, client);
/* corrupt one block */
final short replFactor = 1;
final long fileLength = 512L;
final FileSystem fs = miniCluster.getFileSystem();
final Path file = new Path(baseDir, "/corrupted");
DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
DFSTestUtil.waitReplication(fs, file, replFactor);
final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file);
final int blockFilesCorrupted = miniCluster
.corruptBlockOnDataNodes(block);
assertEquals("Fail to corrupt all replicas for block " + block,
replFactor, blockFilesCorrupted);
/*
* Increase replication factor, this should invoke transfer request.
* Receiving datanode fails on checksum and reports it to namenode
*/
fs.setReplication(file, (short) (replFactor + 1));
/* get block details and check if the block is corrupt */
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlocks blocks = null;
try {
blocks = client.getNamenode().getBlockLocations(file.toString(), 0,
Long.MAX_VALUE);
} catch (IOException e) {
return false;
}
return blocks != null && blocks.get(0).isCorrupt();
}
}, 100, 60000);
BlockManagerTestUtil.updateState(
miniCluster.getNameNode().getNamesystem().getBlockManager());
/* run and verify report command */
resetStream();
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, client);
}
}
private void verifyNodesAndCorruptBlocks(
final int numDn,
final int numLiveDn,
final int numCorruptBlocks,
final DFSClient client) throws IOException {
/* init vars */
final String outStr = scanIntoString(out);
final String expectedLiveNodesStr = String.format(
"Live datanodes (%d)",
numLiveDn);
final String expectedCorruptedBlocksStr = String.format(
"Blocks with corrupt replicas: %d",
numCorruptBlocks);
/* verify nodes and corrupt blocks */
assertThat(outStr, is(allOf(
containsString(expectedLiveNodesStr),
containsString(expectedCorruptedBlocksStr))));
assertEquals(
numDn,
client.getDatanodeStorageReport(DatanodeReportType.ALL).length);
assertEquals(
numLiveDn,
client.getDatanodeStorageReport(DatanodeReportType.LIVE).length);
assertEquals(
numDn - numLiveDn,
client.getDatanodeStorageReport(DatanodeReportType.DEAD).length);
assertEquals(numCorruptBlocks, client.getCorruptBlocksCount());
}
}