HDFS-11986. Dfsadmin should report erasure coding related information separately.

This commit is contained in:
Manoj Govindassamy 2017-08-25 17:21:56 -07:00
parent 36bada3032
commit b89ffcff36
2 changed files with 160 additions and 58 deletions

View File

@ -66,11 +66,13 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeVolumeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeVolumeInfo;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@ -532,16 +534,30 @@ public class DFSAdmin extends FsShell {
* minutes. Use "-metaSave" to list of all such blocks and accurate * minutes. Use "-metaSave" to list of all such blocks and accurate
* counts. * counts.
*/ */
System.out.println("Under replicated blocks: " + BlocksStats blocksStats = dfs.getClient().getNamenode().getBlocksStats();
dfs.getLowRedundancyBlocksCount()); System.out.println("Replicated Blocks:");
System.out.println("Blocks with corrupt replicas: " + System.out.println("\tUnder replicated blocks: " +
dfs.getCorruptBlocksCount()); blocksStats.getLowRedundancyBlocksStat());
System.out.println("Missing blocks: " + System.out.println("\tBlocks with corrupt replicas: " +
dfs.getMissingBlocksCount()); blocksStats.getCorruptBlocksStat());
System.out.println("Missing blocks (with replication factor 1): " + System.out.println("\tMissing blocks: " +
dfs.getMissingReplOneBlocksCount()); blocksStats.getMissingReplicaBlocksStat());
System.out.println("Pending deletion blocks: " + System.out.println("\tMissing blocks (with replication factor 1): " +
dfs.getPendingDeletionBlocksCount()); blocksStats.getMissingReplicationOneBlocksStat());
System.out.println("\tPending deletion blocks: " +
blocksStats.getPendingDeletionBlocksStat());
ECBlockGroupsStats ecBlockGroupsStats =
dfs.getClient().getNamenode().getECBlockGroupsStats();
System.out.println("Erasure Coded Block Groups: ");
System.out.println("\tLow redundancy block groups: " +
ecBlockGroupsStats.getLowRedundancyBlockGroupsStat());
System.out.println("\tBlock groups with corrupt internal blocks: " +
ecBlockGroupsStats.getCorruptBlockGroupsStat());
System.out.println("\tMissing block groups: " +
ecBlockGroupsStats.getMissingBlockGroupsStat());
System.out.println("\tPending deletion block groups: " +
ecBlockGroupsStats.getPendingDeletionBlockGroupsStat());
System.out.println(); System.out.println();

View File

@ -38,11 +38,18 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -498,24 +505,47 @@ public class TestDFSAdmin {
return sb.toString(); return sb.toString();
} }
@Test(timeout = 120000) // get block details and check if the block is corrupt
private void waitForCorruptBlock(MiniDFSCluster miniCluster,
DFSClient client, Path file)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlocks blocks = null;
try {
miniCluster.triggerBlockReports();
blocks = client.getNamenode().getBlockLocations(file.toString(), 0,
Long.MAX_VALUE);
} catch (IOException e) {
return false;
}
return blocks != null && blocks.get(0).isCorrupt();
}
}, 1000, 60000);
}
@Test(timeout = 180000)
public void testReportCommand() throws Exception { public void testReportCommand() throws Exception {
tearDown();
redirectStream(); redirectStream();
/* init conf */ // init conf
final Configuration dfsConf = new HdfsConfiguration(); final Configuration dfsConf = new HdfsConfiguration();
ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByID(
SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
dfsConf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
ecPolicy.getName());
dfsConf.setInt( dfsConf.setInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
500); // 0.5s
dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
final Path baseDir = new Path( final Path baseDir = new Path(
PathUtils.getTestDir(getClass()).getAbsolutePath(), PathUtils.getTestDir(getClass()).getAbsolutePath(),
GenericTestUtils.getMethodName()); GenericTestUtils.getMethodName());
dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString()); dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString());
final int numDn =
ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
final int numDn = 3;
/* init cluster */
try(MiniDFSCluster miniCluster = new MiniDFSCluster try(MiniDFSCluster miniCluster = new MiniDFSCluster
.Builder(dfsConf) .Builder(dfsConf)
.numDataNodes(numDn).build()) { .numDataNodes(numDn).build()) {
@ -523,34 +553,71 @@ public class TestDFSAdmin {
miniCluster.waitActive(); miniCluster.waitActive();
assertEquals(numDn, miniCluster.getDataNodes().size()); assertEquals(numDn, miniCluster.getDataNodes().size());
/* local vars */
final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);
final DFSClient client = miniCluster.getFileSystem().getClient(); final DFSClient client = miniCluster.getFileSystem().getClient();
/* run and verify report command */ // Verify report command for all counts to be zero
resetStream(); resetStream();
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"})); assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn, 0, client); verifyNodesAndCorruptBlocks(numDn, numDn, 0, 0, client);
/* shut down one DN */
final List<DataNode> datanodes = miniCluster.getDataNodes();
final DataNode last = datanodes.get(datanodes.size() - 1);
last.shutdown();
miniCluster.setDataNodeDead(last.getDatanodeId());
/* run and verify report command */
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn - 1, 0, client);
/* corrupt one block */
final short replFactor = 1; final short replFactor = 1;
final long fileLength = 512L; final long fileLength = 512L;
final FileSystem fs = miniCluster.getFileSystem(); final DistributedFileSystem fs = miniCluster.getFileSystem();
final Path file = new Path(baseDir, "/corrupted"); final Path file = new Path(baseDir, "/corrupted");
DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
DFSTestUtil.waitReplication(fs, file, replFactor); DFSTestUtil.waitReplication(fs, file, replFactor);
final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file); final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file);
LocatedBlocks lbs = miniCluster.getFileSystem().getClient().
getNamenode().getBlockLocations(
file.toString(), 0, fileLength);
assertTrue("Unexpected block type: " + lbs.get(0),
lbs.get(0) instanceof LocatedBlock);
LocatedBlock locatedBlock = lbs.get(0);
DatanodeInfo locatedDataNode = locatedBlock.getLocations()[0];
LOG.info("Replica block located on: " + locatedDataNode);
Path ecDir = new Path(baseDir, "ec");
fs.mkdirs(ecDir);
fs.getClient().setErasureCodingPolicy(ecDir.toString(),
ecPolicy.getName());
Path ecFile = new Path(ecDir, "ec-file");
int stripesPerBlock = 2;
int cellSize = ecPolicy.getCellSize();
int blockSize = stripesPerBlock * cellSize;
int blockGroupSize = ecPolicy.getNumDataUnits() * blockSize;
int totalBlockGroups = 1;
DFSTestUtil.createStripedFile(miniCluster, ecFile, ecDir,
totalBlockGroups, stripesPerBlock, false, ecPolicy);
// Verify report command for all counts to be zero
resetStream();
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn, 0, 0, client);
// Choose a DataNode to shutdown
final List<DataNode> datanodes = miniCluster.getDataNodes();
DataNode dataNodeToShutdown = null;
for (DataNode dn : datanodes) {
if (!dn.getDatanodeId().getDatanodeUuid().equals(
locatedDataNode.getDatanodeUuid())) {
dataNodeToShutdown = dn;
break;
}
}
assertTrue("Unable to choose a DataNode to shutdown!",
dataNodeToShutdown != null);
// Shut down the DataNode not hosting the replicated block
LOG.info("Shutting down: " + dataNodeToShutdown);
dataNodeToShutdown.shutdown();
miniCluster.setDataNodeDead(dataNodeToShutdown.getDatanodeId());
// Verify report command to show dead DataNode
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn - 1, 0, 0, client);
// Corrupt the replicated block
final int blockFilesCorrupted = miniCluster final int blockFilesCorrupted = miniCluster
.corruptBlockOnDataNodes(block); .corruptBlockOnDataNodes(block);
assertEquals("Fail to corrupt all replicas for block " + block, assertEquals("Fail to corrupt all replicas for block " + block,
@ -564,35 +631,44 @@ public class TestDFSAdmin {
// expected exception reading corrupt blocks // expected exception reading corrupt blocks
} }
/* // Increase replication factor, this should invoke transfer request.
* Increase replication factor, this should invoke transfer request. // Receiving datanode fails on checksum and reports it to namenode
* Receiving datanode fails on checksum and reports it to namenode
*/
fs.setReplication(file, (short) (replFactor + 1)); fs.setReplication(file, (short) (replFactor + 1));
/* get block details and check if the block is corrupt */ // get block details and check if the block is corrupt
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlocks blocks = null;
try {
miniCluster.triggerBlockReports();
blocks = client.getNamenode().getBlockLocations(file.toString(), 0,
Long.MAX_VALUE);
} catch (IOException e) {
return false;
}
return blocks != null && blocks.get(0).isCorrupt();
}
}, 1000, 60000);
BlockManagerTestUtil.updateState( BlockManagerTestUtil.updateState(
miniCluster.getNameNode().getNamesystem().getBlockManager()); miniCluster.getNameNode().getNamesystem().getBlockManager());
waitForCorruptBlock(miniCluster, client, file);
/* run and verify report command */ // verify report command for corrupt replicated block
resetStream(); resetStream();
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"})); assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, client); verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, 0, client);
lbs = miniCluster.getFileSystem().getClient().
getNamenode().getBlockLocations(
ecFile.toString(), 0, blockGroupSize);
assertTrue("Unexpected block type: " + lbs.get(0),
lbs.get(0) instanceof LocatedStripedBlock);
LocatedStripedBlock bg =
(LocatedStripedBlock)(lbs.get(0));
miniCluster.getNamesystem().writeLock();
try {
BlockManager bm = miniCluster.getNamesystem().getBlockManager();
bm.findAndMarkBlockAsCorrupt(bg.getBlock(), bg.getLocations()[0],
"STORAGE_ID", "TEST");
BlockManagerTestUtil.updateState(bm);
} finally {
miniCluster.getNamesystem().writeUnlock();
}
waitForCorruptBlock(miniCluster, client, file);
// verify report command for corrupt replicated block
// and EC block group
resetStream();
assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, 1, client);
} }
} }
@ -669,6 +745,7 @@ public class TestDFSAdmin {
final int numDn, final int numDn,
final int numLiveDn, final int numLiveDn,
final int numCorruptBlocks, final int numCorruptBlocks,
final int numCorruptECBlockGroups,
final DFSClient client) throws IOException { final DFSClient client) throws IOException {
/* init vars */ /* init vars */
@ -679,11 +756,15 @@ public class TestDFSAdmin {
final String expectedCorruptedBlocksStr = String.format( final String expectedCorruptedBlocksStr = String.format(
"Blocks with corrupt replicas: %d", "Blocks with corrupt replicas: %d",
numCorruptBlocks); numCorruptBlocks);
final String expectedCorruptedECBlockGroupsStr = String.format(
"Block groups with corrupt internal blocks: %d",
numCorruptECBlockGroups);
/* verify nodes and corrupt blocks */ // verify nodes and corrupt blocks
assertThat(outStr, is(allOf( assertThat(outStr, is(allOf(
containsString(expectedLiveNodesStr), containsString(expectedLiveNodesStr),
containsString(expectedCorruptedBlocksStr)))); containsString(expectedCorruptedBlocksStr),
containsString(expectedCorruptedECBlockGroupsStr))));
assertEquals( assertEquals(
numDn, numDn,
@ -694,7 +775,12 @@ public class TestDFSAdmin {
assertEquals( assertEquals(
numDn - numLiveDn, numDn - numLiveDn,
client.getDatanodeStorageReport(DatanodeReportType.DEAD).length); client.getDatanodeStorageReport(DatanodeReportType.DEAD).length);
assertEquals(numCorruptBlocks, client.getCorruptBlocksCount()); assertEquals(numCorruptBlocks + numCorruptECBlockGroups,
client.getCorruptBlocksCount());
assertEquals(numCorruptBlocks, client.getNamenode()
.getBlocksStats().getCorruptBlocksStat());
assertEquals(numCorruptECBlockGroups, client.getNamenode()
.getECBlockGroupsStats().getCorruptBlockGroupsStat());
} }
@Test @Test