HDFS-6663. Admin command to track file and locations from block id.

Contributed by Chen He.
This commit is contained in:
Kihwal Lee 2014-10-28 14:26:04 -05:00
parent 0d3e7e2bd6
commit 371a3b87ed
6 changed files with 394 additions and 15 deletions

View File

@ -262,6 +262,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7278. Add a command that allows sysadmins to manually trigger full
block reports from a DN (cmccabe)
HDFS-6663. Admin command to track file and locations from block id.
(Chen He via kihwal)
IMPROVEMENTS
HDFS-7055. Add tracing to DFSInputStream (cmccabe)

View File

@ -3505,6 +3505,13 @@ public class BlockManager {
return corruptReplicas.getNodes(block);
}
/**
* Get reason for certain corrupted replicas for a given block and a given dn.
*/
public String getCorruptReason(Block block, DatanodeDescriptor node) {
return corruptReplicas.getCorruptReason(block, node);
}
/** @return the size of UnderReplicatedBlocks */
public int numOfUnderReplicatedBlocks() {
return neededReplications.size();

View File

@ -223,5 +223,26 @@ public class CorruptReplicasMap{
}
return ret;
}
}
/**
* return the reason about corrupted replica for a given block
* on a given dn
* @param block block that has corrupted replica
* @param node datanode that contains this corrupted replica
* @return reason
*/
String getCorruptReason(Block block, DatanodeDescriptor node) {
Reason reason = null;
if(corruptReplicasMap.containsKey(block)) {
if (corruptReplicasMap.get(block).containsKey(node)) {
reason = corruptReplicasMap.get(block).get(node);
}
}
if (reason != null) {
return reason.toString();
} else {
return null;
}
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -59,8 +60,12 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@ -103,6 +108,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
// return string marking fsck status
public static final String CORRUPT_STATUS = "is CORRUPT";
public static final String HEALTHY_STATUS = "is HEALTHY";
public static final String DECOMMISSIONING_STATUS = "is DECOMMISSIONING";
public static final String DECOMMISSIONED_STATUS = "is DECOMMISSIONED";
public static final String NONEXISTENT_STATUS = "does not exist";
public static final String FAILURE_STATUS = "FAILED";
@ -143,7 +150,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
*/
private boolean doDelete = false;
private String path = "/";
String path = "/";
private String blockIds = null;
// We return back N files that are corrupt; the list of files returned is
// ordered by block id; to allow continuation support, pass in the last block
@ -195,21 +204,112 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
else if (key.equals("listcorruptfileblocks")) {
this.showCorruptFileBlocks = true;
}
else if (key.equals("startblockafter")) {
} else if (key.equals("startblockafter")) {
this.currentCookie[0] = pmap.get("startblockafter")[0];
} else if (key.equals("includeSnapshots")) {
this.snapshottableDirs = new ArrayList<String>();
} else if (key.equals("blockId")) {
this.blockIds = pmap.get("blockId")[0];
}
}
}
/**
* Check block information given a blockId number
*
*/
public void blockIdCK(String blockId) {
if(blockId == null) {
out.println("Please provide valid blockId!");
return;
}
BlockManager bm = namenode.getNamesystem().getBlockManager();
try {
//get blockInfo
Block block = new Block(Block.getBlockId(blockId));
//find which file this block belongs to
BlockInfo blockInfo = bm.getStoredBlock(block);
if(blockInfo == null) {
out.println("Block "+ blockId +" " + NONEXISTENT_STATUS);
LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS);
return;
}
BlockCollection bc = bm.getBlockCollection(blockInfo);
INode iNode = (INode) bc;
NumberReplicas numberReplicas= bm.countNodes(block);
out.println("Block Id: " + blockId);
out.println("Block belongs to: "+iNode.getFullPathName());
out.println("No. of Expected Replica: " + bc.getBlockReplication());
out.println("No. of live Replica: " + numberReplicas.liveReplicas());
out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes());
out.println("No. of decommission Replica: "
+ numberReplicas.decommissionedReplicas());
out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas());
//record datanodes that have corrupted block replica
Collection<DatanodeDescriptor> corruptionRecord = null;
if (bm.getCorruptReplicas(block) != null) {
corruptionRecord = bm.getCorruptReplicas(block);
}
//report block replicas status on datanodes
for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) {
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
out.print("Block replica on datanode/rack: " + dn.getHostName() +
dn.getNetworkLocation() + " ");
if (corruptionRecord != null && corruptionRecord.contains(dn)) {
out.print(CORRUPT_STATUS+"\t ReasonCode: "+
bm.getCorruptReason(block,dn));
} else if (dn.isDecommissioned() ){
out.print(DECOMMISSIONED_STATUS);
} else if (dn.isDecommissionInProgress()) {
out.print(DECOMMISSIONING_STATUS);
} else {
out.print(HEALTHY_STATUS);
}
out.print("\n");
}
} catch (Exception e){
String errMsg = "Fsck on blockId '" + blockId;
LOG.warn(errMsg, e);
out.println(e.getMessage());
out.print("\n\n" + errMsg);
LOG.warn("Error in looking up block", e);
}
}
/**
* Check files on DFS, starting from the indicated path.
*/
public void fsck() {
final long startTime = Time.now();
try {
if(blockIds != null) {
String[] blocks = blockIds.split(" ");
StringBuilder sb = new StringBuilder();
sb.append("FSCK started by " +
UserGroupInformation.getCurrentUser() + " from " +
remoteAddress + " at " + new Date());
out.println(sb.toString());
sb.append(" for blockIds: \n");
for (String blk: blocks) {
if(blk == null || !blk.contains("blk_")) {
out.println("Incorrect blockId format: " + blk);
continue;
}
out.print("\n");
blockIdCK(blk);
sb.append(blk + "\n");
}
LOG.info(sb.toString());
namenode.getNamesystem().logFsckEvent("/", remoteAddress);
out.flush();
return;
}
String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()
+ " from " + remoteAddress + " for path " + path + " at " + new Date();
LOG.info(msg);

View File

@ -92,7 +92,10 @@ public class DFSck extends Configured implements Tool {
+ "\t-blocks\tprint out block report\n"
+ "\t-locations\tprint out locations for every block\n"
+ "\t-racks\tprint out network topology for data-node locations\n"
+ "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n\n"
+ "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n"
+ "\t-blockId\tprint out which file this blockId belongs to, locations"
+ " (nodes, racks) of this block, and other diagnostics info"
+ " (under replicated, corrupted or not, etc)\n\n"
+ "Please Note:\n"
+ "\t1. By default fsck ignores files opened for write, "
+ "use -openforwrite to report such files. They are usually "
@ -278,6 +281,15 @@ public class DFSck extends Configured implements Tool {
doListCorruptFileBlocks = true;
} else if (args[idx].equals("-includeSnapshots")) {
url.append("&includeSnapshots=1");
} else if (args[idx].equals("-blockId")) {
StringBuilder sb = new StringBuilder();
idx++;
while(idx < args.length && !args[idx].startsWith("-")){
sb.append(args[idx]);
sb.append(" ");
idx++;
}
url.append("&blockId=").append(URLEncoder.encode(sb.toString(), "UTF-8"));
} else if (!args[idx].startsWith("-")) {
if (null == dir) {
dir = args[idx];
@ -287,6 +299,7 @@ public class DFSck extends Configured implements Tool {
printUsage(System.err);
return -1;
}
} else {
System.err.println("fsck: Illegal option '" + args[idx] + "'");
printUsage(System.err);
@ -327,6 +340,12 @@ public class DFSck extends Configured implements Tool {
errCode = 1;
} else if (lastLine.endsWith(NamenodeFsck.NONEXISTENT_STATUS)) {
errCode = 0;
} else if (lastLine.contains("Incorrect blockId format:")) {
errCode = 0;
} else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONED_STATUS)) {
errCode = 2;
} else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONING_STATUS)) {
errCode = 3;
}
return errCode;
}

View File

@ -18,13 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -48,6 +41,7 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.collect.Sets;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -66,11 +60,14 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -86,8 +83,17 @@ import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender;
import org.junit.Test;
import com.google.common.collect.Sets;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* A JUnit test for doing fsck
@ -118,7 +124,7 @@ public class TestFsck {
System.getProperty("line.separator");
static String runFsck(Configuration conf, int expectedErrCode,
boolean checkErrorCode,String... path)
boolean checkErrorCode,String... path)
throws Exception {
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
@ -1096,4 +1102,227 @@ public class TestFsck {
cluster.shutdown();
}
}
/**
* Test for blockIdCK
*/
@Test
public void testBlockIdCK() throws Exception {
final short REPL_FACTOR = 2;
short NUM_DN = 2;
final long blockSize = 512;
String [] racks = {"/rack1", "/rack2"};
String [] hosts = {"host1", "host2"};
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
MiniDFSCluster cluster = null;
DistributedFileSystem dfs = null;
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
.racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(dfs, path, 1024, REPL_FACTOR , 1000L);
util.waitReplication(dfs, path, REPL_FACTOR);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
//run fsck
try {
//illegal input test
String runFsckResult = runFsck(conf, 0, true, "/", "-blockId",
"not_a_block_id");
assertTrue(runFsckResult.contains("Incorrect blockId format:"));
//general test
runFsckResult = runFsck(conf, 0, true, "/", "-blockId", sb.toString());
assertTrue(runFsckResult.contains(bIds[0]));
assertTrue(runFsckResult.contains(bIds[1]));
assertTrue(runFsckResult.contains(
"Block replica on datanode/rack: host1/rack1 is HEALTHY"));
assertTrue(runFsckResult.contains(
"Block replica on datanode/rack: host2/rack2 is HEALTHY"));
} finally {
cluster.shutdown();
}
}
/**
* Test for blockIdCK with datanode decommission
*/
@Test
public void testBlockIdCKDecommission() throws Exception {
final short REPL_FACTOR = 1;
short NUM_DN = 2;
final long blockSize = 512;
boolean checkDecommissionInProgress = false;
String [] racks = {"/rack1", "/rack2"};
String [] hosts = {"host1", "host2"};
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
MiniDFSCluster cluster;
DistributedFileSystem dfs ;
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
.racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(dfs, path, 1024, REPL_FACTOR, 1000L);
util.waitReplication(dfs, path, REPL_FACTOR);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
try {
//make sure datanode that has replica is fine before decommission
String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
//decommission datanode
ExtendedBlock eb = util.getFirstBlock(dfs, path);
DatanodeDescriptor dn = cluster.getNameNode().getNamesystem()
.getBlockManager().getBlockCollection(eb.getLocalBlock())
.getBlocks()[0].getDatanode(0);
cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().startDecommission(dn);
String dnName = dn.getXferAddr();
//wait for decommission start
DatanodeInfo datanodeInfo = null;
int count = 0;
do {
Thread.sleep(2000);
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
//check decommissioning only once
if(!checkDecommissionInProgress && datanodeInfo != null
&& datanodeInfo.isDecommissionInProgress()) {
String fsckOut = runFsck(conf, 3, true, "/", "-blockId", bIds[0]);
assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONING_STATUS));
checkDecommissionInProgress = true;
}
} while (datanodeInfo != null && !datanodeInfo.isDecommissioned());
//check decommissioned
String fsckOut = runFsck(conf, 2, true, "/", "-blockId", bIds[0]);
assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONED_STATUS));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test for blockIdCK with block corruption
*/
@Test
public void testBlockIdCKCorruption() throws Exception {
short NUM_DN = 1;
final long blockSize = 512;
Random random = new Random();
DFSClient dfsClient;
LocatedBlocks blocks;
ExtendedBlock block;
short repFactor = 1;
String [] racks = {"/rack1"};
String [] hosts = {"host1"};
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
// Set short retry timeouts so this test runs faster
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
MiniDFSCluster cluster = null;
DistributedFileSystem dfs = null;
try {
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
.racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(dfs, path, 1024, repFactor, 1000L);
util.waitReplication(dfs, path, repFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
//make sure block is healthy before we corrupt it
String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// corrupt replicas
block = DFSTestUtil.getFirstBlock(dfs, path);
File blockFile = MiniDFSCluster.getBlockFile(0, block);
if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
int rand = random.nextInt((int) channel.size()/2);
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
}
util.waitCorruptReplicas(dfs, cluster.getNamesystem(), path, block, 1);
outStr = runFsck(conf, 1, false, "/", "-blockId", block.getBlockName());
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}