HDFS-3548. NamenodeFsck.copyBlock fails to create a Block Reader. Contributed by Colin Patrick McCabe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1358822 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-07-08 19:41:55 +00:00
parent 07295260b1
commit 21fdf16b0d
4 changed files with 298 additions and 43 deletions

View File

@ -431,6 +431,9 @@ Branch-2 ( Unreleased changes )
HDFS-711. hdfsUtime does not handle atime = 0 or mtime = 0 correctly. HDFS-711. hdfsUtime does not handle atime = 0 or mtime = 0 correctly.
(Colin Patrick McCabe via eli) (Colin Patrick McCabe via eli)
HDFS-3548. NamenodeFsck.copyBlock fails to create a Block Reader.
(Colin Patrick McCabe via eli)
BREAKDOWN OF HDFS-3042 SUBTASKS BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd) HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
@ -35,6 +36,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
@ -51,6 +53,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -103,6 +106,12 @@ public class NamenodeFsck {
private boolean showRacks = false; private boolean showRacks = false;
private boolean showCorruptFileBlocks = false; private boolean showCorruptFileBlocks = false;
/**
* True if we encountered an internal error during FSCK, such as not being
* able to delete a corrupt file.
*/
private boolean internalError = false;
/** /**
* True if the user specified the -move option. * True if the user specified the -move option.
* *
@ -200,6 +209,13 @@ public class NamenodeFsck {
out.println("FSCK ended at " + new Date() + " in " out.println("FSCK ended at " + new Date() + " in "
+ (System.currentTimeMillis() - startTime + " milliseconds")); + (System.currentTimeMillis() - startTime + " milliseconds"));
// If there were internal errors during the fsck operation, we want to
// return FAILURE_STATUS, even if those errors were not immediately
// fatal. Otherwise many unit tests will pass even when there are bugs.
if (internalError) {
throw new IOException("fsck encountered internal errors!");
}
// DFSck client scans for the string HEALTHY/CORRUPT to check the status // DFSck client scans for the string HEALTHY/CORRUPT to check the status
// of file system and return appropriate code. Changing the output // of file system and return appropriate code. Changing the output
// string might break testcases. Also note this must be the last line // string might break testcases. Also note this must be the last line
@ -388,20 +404,11 @@ public class NamenodeFsck {
+ " blocks of total size " + missize + " B."); + " blocks of total size " + missize + " B.");
} }
res.corruptFiles++; res.corruptFiles++;
try { if (isOpen) {
if (doMove) { LOG.info("Fsck: ignoring open file " + path);
if (!isOpen) { } else {
copyBlocksToLostFound(parent, file, blocks); if (doMove) copyBlocksToLostFound(parent, file, blocks);
} if (doDelete) deleteCorruptedFile(path);
}
if (doDelete) {
if (!isOpen) {
LOG.warn("\n - deleting corrupted file " + path);
namenode.getRpcServer().delete(path, true);
}
}
} catch (IOException e) {
LOG.error("error processing " + path + ": " + e.toString());
} }
} }
if (showFiles) { if (showFiles) {
@ -415,29 +422,52 @@ public class NamenodeFsck {
} }
} }
} }
private void deleteCorruptedFile(String path) {
try {
namenode.getRpcServer().delete(path, true);
LOG.info("Fsck: deleted corrupt file " + path);
} catch (Exception e) {
LOG.error("Fsck: error deleting corrupted file " + path, e);
internalError = true;
}
}
boolean hdfsPathExists(String path)
throws AccessControlException, UnresolvedLinkException, IOException {
try {
HdfsFileStatus hfs = namenode.getRpcServer().getFileInfo(path);
return (hfs != null);
} catch (FileNotFoundException e) {
return false;
}
}
private void copyBlocksToLostFound(String parent, HdfsFileStatus file, private void copyBlocksToLostFound(String parent, HdfsFileStatus file,
LocatedBlocks blocks) throws IOException { LocatedBlocks blocks) throws IOException {
final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf); final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
final String fullName = file.getFullName(parent);
OutputStream fos = null;
try { try {
if (!lfInited) { if (!lfInited) {
lostFoundInit(dfs); lostFoundInit(dfs);
} }
if (!lfInitedOk) { if (!lfInitedOk) {
return; throw new IOException("failed to initialize lost+found");
} }
String fullName = file.getFullName(parent); String target = lostFound + fullName;
String target = lostFound + fullName; if (hdfsPathExists(target)) {
String errmsg = "Failed to move " + fullName + " to /lost+found"; LOG.warn("Fsck: can't copy the remains of " + fullName + " to " +
try { "lost+found, because " + target + " already exists.");
return;
}
if (!namenode.getRpcServer().mkdirs( if (!namenode.getRpcServer().mkdirs(
target, file.getPermission(), true)) { target, file.getPermission(), true)) {
LOG.warn(errmsg); throw new IOException("failed to create directory " + target);
return;
} }
// create chains // create chains
int chain = 0; int chain = 0;
OutputStream fos = null; boolean copyError = false;
for (LocatedBlock lBlk : blocks.getLocatedBlocks()) { for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
LocatedBlock lblock = lBlk; LocatedBlock lblock = lBlk;
DatanodeInfo[] locs = lblock.getLocations(); DatanodeInfo[] locs = lblock.getLocations();
@ -451,32 +481,38 @@ public class NamenodeFsck {
} }
if (fos == null) { if (fos == null) {
fos = dfs.create(target + "/" + chain, true); fos = dfs.create(target + "/" + chain, true);
if (fos != null) if (fos == null) {
chain++; throw new IOException("Failed to copy " + fullName +
else { " to /lost+found: could not store chain " + chain);
throw new IOException(errmsg + ": could not store chain " + chain);
} }
chain++;
} }
// copy the block. It's a pity it's not abstracted from DFSInputStream ... // copy the block. It's a pity it's not abstracted from DFSInputStream ...
try { try {
copyBlock(dfs, lblock, fos); copyBlock(dfs, lblock, fos);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); LOG.error("Fsck: could not copy block " + lblock.getBlock() +
// something went wrong copying this block... " to " + target, e);
LOG.warn(" - could not copy block " + lblock.getBlock() + " to " + target);
fos.flush(); fos.flush();
fos.close(); fos.close();
fos = null; fos = null;
internalError = true;
copyError = true;
} }
} }
if (fos != null) fos.close(); if (copyError) {
LOG.warn("\n - copied corrupted file " + fullName + " to /lost+found"); LOG.warn("Fsck: there were errors copying the remains of the " +
} catch (Exception e) { "corrupted file " + fullName + " to /lost+found");
e.printStackTrace(); } else {
LOG.warn(errmsg + ": " + e.getMessage()); LOG.info("Fsck: copied the remains of the corrupted file " +
} fullName + " to /lost+found");
}
} catch (Exception e) {
LOG.error("copyBlocksToLostFound: error processing " + fullName, e);
internalError = true;
} finally { } finally {
if (fos != null) fos.close();
dfs.close(); dfs.close();
} }
} }
@ -503,7 +539,7 @@ public class NamenodeFsck {
targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr()); targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr());
} catch (IOException ie) { } catch (IOException ie) {
if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) { if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) {
throw new IOException("Could not obtain block " + lblock); throw new IOException("Could not obtain block " + lblock, ie);
} }
LOG.info("Could not obtain block from any node: " + ie); LOG.info("Could not obtain block from any node: " + ie);
try { try {
@ -515,7 +551,7 @@ public class NamenodeFsck {
continue; continue;
} }
try { try {
s = new Socket(); s = NetUtils.getDefaultSocketFactory(conf).createSocket();
s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
@ -555,7 +591,7 @@ public class NamenodeFsck {
", but datanode returned " +bytesRead+" bytes"); ", but datanode returned " +bytesRead+" bytes");
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); LOG.error("Error reading block", e);
success = false; success = false;
} finally { } finally {
try {s.close(); } catch (Exception e1) {} try {s.close(); } catch (Exception e1) {}
@ -606,6 +642,7 @@ public class NamenodeFsck {
if (lostFound == null) { if (lostFound == null) {
LOG.warn("Cannot initialize /lost+found ."); LOG.warn("Cannot initialize /lost+found .");
lfInitedOk = false; lfInitedOk = false;
internalError = true;
} }
} }

View File

@ -115,6 +115,22 @@ public class DFSTestUtil {
this.maxSize = maxSize; this.maxSize = maxSize;
this.minSize = minSize; this.minSize = minSize;
} }
/** Creates a new instance of DFSTestUtil
*
* @param testName Name of the test from where this utility is used
* @param nFiles Number of files to be created
* @param maxLevels Maximum number of directory levels
* @param maxSize Maximum size for file
* @param minSize Minimum size for file
*/
public DFSTestUtil(String testName, int nFiles, int maxLevels, int maxSize,
int minSize) {
this.nFiles = nFiles;
this.maxLevels = maxLevels;
this.maxSize = maxSize;
this.minSize = minSize;
}
/** /**
* when formating a namenode - we must provide clusterid. * when formating a namenode - we must provide clusterid.

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
@ -37,6 +38,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -44,14 +47,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -60,6 +66,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSck; import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -68,6 +75,8 @@ import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender; import org.apache.log4j.RollingFileAppender;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Sets;
/** /**
* A JUnit test for doing fsck * A JUnit test for doing fsck
*/ */
@ -84,6 +93,9 @@ public class TestFsck {
"cmd=fsck\\ssrc=\\/\\sdst=null\\s" + "cmd=fsck\\ssrc=\\/\\sdst=null\\s" +
"perm=null"); "perm=null");
static final Pattern numCorruptBlocksPattern = Pattern.compile(
".*Corrupt blocks:\t\t([0123456789]*).*");
static String runFsck(Configuration conf, int expectedErrCode, static String runFsck(Configuration conf, int expectedErrCode,
boolean checkErrorCode,String... path) boolean checkErrorCode,String... path)
throws Exception { throws Exception {
@ -95,6 +107,7 @@ public class TestFsck {
assertEquals(expectedErrCode, errCode); assertEquals(expectedErrCode, errCode);
} }
((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.INFO); ((Log4JLogger)FSPermissionChecker.LOG).getLogger().setLevel(Level.INFO);
FSImage.LOG.error("OUTPUT = " + bStream.toString());
return bStream.toString(); return bStream.toString();
} }
@ -246,6 +259,192 @@ public class TestFsck {
} }
} }
@Test
public void testFsckMove() throws Exception {
Configuration conf = new HdfsConfiguration();
final int DFS_BLOCK_SIZE = 1024;
final int NUM_DATANODES = 4;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
DFSTestUtil util = new DFSTestUtil("TestFsck", 5, 3,
(5 * DFS_BLOCK_SIZE) + (DFS_BLOCK_SIZE - 1), 5 * DFS_BLOCK_SIZE);
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).
numDataNodes(NUM_DATANODES).build();
String topDir = "/srcdat";
fs = cluster.getFileSystem();
cluster.waitActive();
util.createFiles(fs, topDir);
util.waitReplication(fs, topDir, (short)3);
String outStr = runFsck(conf, 0, true, "/");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
String fileNames[] = util.getFileNames(topDir);
CorruptedTestFile ctFiles[] = new CorruptedTestFile[] {
new CorruptedTestFile(fileNames[0], Sets.newHashSet(0),
dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE),
new CorruptedTestFile(fileNames[1], Sets.newHashSet(2, 3),
dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE),
new CorruptedTestFile(fileNames[2], Sets.newHashSet(4),
dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE),
new CorruptedTestFile(fileNames[3], Sets.newHashSet(0, 1, 2, 3),
dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE),
new CorruptedTestFile(fileNames[4], Sets.newHashSet(1, 2, 3, 4),
dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE)
};
int totalMissingBlocks = 0;
for (CorruptedTestFile ctFile : ctFiles) {
totalMissingBlocks += ctFile.getTotalMissingBlocks();
}
for (CorruptedTestFile ctFile : ctFiles) {
ctFile.removeBlocks();
}
// Wait for fsck to discover all the missing blocks
while (true) {
outStr = runFsck(conf, 1, false, "/");
String numCorrupt = null;
for (String line : outStr.split("\n")) {
Matcher m = numCorruptBlocksPattern.matcher(line);
if (m.matches()) {
numCorrupt = m.group(1);
break;
}
}
if (numCorrupt == null) {
throw new IOException("failed to find number of corrupt " +
"blocks in fsck output.");
}
if (numCorrupt.equals(Integer.toString(totalMissingBlocks))) {
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
}
}
// Copy the non-corrupt blocks of corruptFileName to lost+found.
outStr = runFsck(conf, 1, false, "/", "-move");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
// Make sure that we properly copied the block files from the DataNodes
// to lost+found
for (CorruptedTestFile ctFile : ctFiles) {
ctFile.checkSalvagedRemains();
}
// Fix the filesystem by removing corruptFileName
outStr = runFsck(conf, 1, true, "/", "-delete");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
// Check to make sure we have a healthy filesystem
outStr = runFsck(conf, 0, true, "/");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
util.cleanup(fs, topDir);
} finally {
if (fs != null) {try{fs.close();} catch(Exception e){}}
if (cluster != null) { cluster.shutdown(); }
}
}
static private class CorruptedTestFile {
final private String name;
final private Set<Integer> blocksToCorrupt;
final private DFSClient dfsClient;
final private int numDataNodes;
final private int blockSize;
final private byte[] initialContents;
public CorruptedTestFile(String name, Set<Integer> blocksToCorrupt,
DFSClient dfsClient, int numDataNodes, int blockSize)
throws IOException {
this.name = name;
this.blocksToCorrupt = blocksToCorrupt;
this.dfsClient = dfsClient;
this.numDataNodes = numDataNodes;
this.blockSize = blockSize;
this.initialContents = cacheInitialContents();
}
public int getTotalMissingBlocks() {
return blocksToCorrupt.size();
}
private byte[] cacheInitialContents() throws IOException {
HdfsFileStatus status = dfsClient.getFileInfo(name);
byte[] content = new byte[(int)status.getLen()];
DFSInputStream in = null;
try {
in = dfsClient.open(name);
IOUtils.readFully(in, content, 0, content.length);
} finally {
in.close();
}
return content;
}
public void removeBlocks() throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, IOException {
for (int corruptIdx : blocksToCorrupt) {
// Corrupt a block by deleting it
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(
name, blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock();
for (int i = 0; i < numDataNodes; i++) {
File blockFile = MiniDFSCluster.getBlockFile(i, block);
if(blockFile != null && blockFile.exists()) {
assertTrue(blockFile.delete());
}
}
}
}
public void checkSalvagedRemains() throws IOException {
int chainIdx = 0;
HdfsFileStatus status = dfsClient.getFileInfo(name);
long length = status.getLen();
int numBlocks = (int)((length + blockSize - 1) / blockSize);
DFSInputStream in = null;
byte[] blockBuffer = new byte[blockSize];
try {
for (int blockIdx = 0; blockIdx < numBlocks; blockIdx++) {
if (blocksToCorrupt.contains(blockIdx)) {
if (in != null) {
in.close();
in = null;
}
continue;
}
if (in == null) {
in = dfsClient.open("/lost+found" + name + "/" + chainIdx);
chainIdx++;
}
int len = blockBuffer.length;
if (blockIdx == (numBlocks - 1)) {
// The last block might not be full-length
len = (int)(in.getFileLength() % blockSize);
if (len == 0) len = blockBuffer.length;
}
IOUtils.readFully(in, blockBuffer, 0, (int)len);
int startIdx = blockIdx * blockSize;
for (int i = 0; i < len; i++) {
if (initialContents[startIdx + i] != blockBuffer[i]) {
throw new IOException("salvaged file " + name + " differed " +
"from what we expected on block " + blockIdx);
}
}
}
} finally {
IOUtils.cleanup(null, in);
}
}
}
@Test @Test
public void testFsckMoveAndDelete() throws Exception { public void testFsckMoveAndDelete() throws Exception {
final int MAX_MOVE_TRIES = 5; final int MAX_MOVE_TRIES = 5;