diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java index 6883a7abc7b..efe39056405 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java @@ -57,8 +57,11 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.base.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; @@ -74,7 +77,6 @@ import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -103,41 +105,45 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; import org.apache.log4j.RollingFileAppender; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import com.google.common.collect.Sets; /** - * A JUnit test for doing fsck + * A JUnit test for doing fsck. */ public class TestFsck { + private static final Log LOG = + LogFactory.getLog(TestFsck.class.getName()); static final String auditLogFile = System.getProperty("test.build.dir", "build/test") + "/TestFsck-audit.log"; - + // Pattern for: // allowed=true ugi=name ip=/address cmd=FSCK src=/ dst=null perm=null - static final Pattern fsckPattern = Pattern.compile( + static final Pattern FSCK_PATTERN = Pattern.compile( "allowed=.*?\\s" + "ugi=.*?\\s" + "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" + "cmd=fsck\\ssrc=\\/\\sdst=null\\s" + "perm=null\\s" + "proto=.*"); - static final Pattern getfileinfoPattern = Pattern.compile( + static final Pattern GET_FILE_INFO_PATTERN = Pattern.compile( "allowed=.*?\\s" + "ugi=.*?\\s" + "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" + "cmd=getfileinfo\\ssrc=\\/\\sdst=null\\s" + "perm=null\\s" + "proto=.*"); - static final Pattern numCorruptBlocksPattern = Pattern.compile( + static final Pattern NUM_CORRUPT_BLOCKS_PATTERN = Pattern.compile( ".*Corrupt blocks:\t\t([0123456789]*).*"); private static final String LINE_SEPARATOR = - System.getProperty("line.separator"); + System.getProperty("line.separator"); static String runFsck(Configuration conf, int expectedErrCode, - boolean checkErrorCode,String... path) + boolean checkErrorCode, String... path) throws Exception { ByteArrayOutputStream bStream = new ByteArrayOutputStream(); PrintStream out = new PrintStream(bStream, true); @@ -147,60 +153,72 @@ public class TestFsck { assertEquals(expectedErrCode, errCode); } GenericTestUtils.setLogLevel(FSPermissionChecker.LOG, Level.INFO); - FSImage.LOG.info("OUTPUT = " + bStream.toString()); + LOG.info("OUTPUT = " + bStream.toString()); return bStream.toString(); } - /** do fsck */ + private MiniDFSCluster cluster = null; + private Configuration conf = null; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + } + + @After + public void tearDown() throws Exception { + shutdownCluster(); + } + + private void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** do fsck. */ @Test public void testFsck() throws Exception { DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck"). setNumFiles(20).build(); - MiniDFSCluster cluster = null; FileSystem fs = null; - try { - Configuration conf = new HdfsConfiguration(); - final long precision = 1L; - conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); - fs = cluster.getFileSystem(); - final String fileName = "/srcdat"; - util.createFiles(fs, fileName); - util.waitReplication(fs, fileName, (short)3); - final Path file = new Path(fileName); - long aTime = fs.getFileStatus(file).getAccessTime(); - Thread.sleep(precision); - setupAuditLogs(); - String outStr = runFsck(conf, 0, true, "/"); - verifyAuditLogs(); - assertEquals(aTime, fs.getFileStatus(file).getAccessTime()); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - if (fs != null) {try{fs.close();} catch(Exception e){}} - cluster.shutdown(); - - // restart the cluster; bring up namenode but not the data nodes - cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(0).format(false).build(); - outStr = runFsck(conf, 1, true, "/"); - // expect the result is corrupt - assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); - System.out.println(outStr); - - // bring up data nodes & cleanup cluster - cluster.startDataNodes(conf, 4, true, null, null); - cluster.waitActive(); - cluster.waitClusterUp(); - fs = cluster.getFileSystem(); - util.cleanup(fs, "/srcdat"); - } finally { - if (fs != null) {try{fs.close();} catch(Exception e){}} - if (cluster != null) { cluster.shutdown(); } - } + final long precision = 1L; + conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, + precision); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + fs = cluster.getFileSystem(); + final String fileName = "/srcdat"; + util.createFiles(fs, fileName); + util.waitReplication(fs, fileName, (short)3); + final Path file = new Path(fileName); + long aTime = fs.getFileStatus(file).getAccessTime(); + Thread.sleep(precision); + setupAuditLogs(); + String outStr = runFsck(conf, 0, true, "/"); + verifyAuditLogs(); + assertEquals(aTime, fs.getFileStatus(file).getAccessTime()); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + shutdownCluster(); + + // restart the cluster; bring up namenode but not the data nodes + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0).format(false).build(); + outStr = runFsck(conf, 1, true, "/"); + // expect the result is corrupt + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + System.out.println(outStr); + + // bring up data nodes & cleanup cluster + cluster.startDataNodes(conf, 4, true, null, null); + cluster.waitActive(); + cluster.waitClusterUp(); + fs = cluster.getFileSystem(); + util.cleanup(fs, "/srcdat"); } - /** Sets up log4j logger for auditlogs */ + /** Sets up log4j logger for auditlogs. */ private void setupAuditLogs() throws IOException { File file = new File(auditLogFile); if (file.exists()) { @@ -230,11 +248,11 @@ public class TestFsck { line = reader.readLine(); assertNotNull(line); assertTrue("Expected getfileinfo event not found in audit log", - getfileinfoPattern.matcher(line).matches()); + GET_FILE_INFO_PATTERN.matcher(line).matches()); } line = reader.readLine(); assertNotNull(line); - assertTrue("Expected fsck event not found in audit log", fsckPattern + assertTrue("Expected fsck event not found in audit log", FSCK_PATTERN .matcher(line).matches()); assertNull("Unexpected event in audit log", reader.readLine()); } finally { @@ -253,167 +271,147 @@ public class TestFsck { public void testFsckNonExistent() throws Exception { DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck"). setNumFiles(20).build(); - MiniDFSCluster cluster = null; FileSystem fs = null; - try { - Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); - fs = cluster.getFileSystem(); - util.createFiles(fs, "/srcdat"); - util.waitReplication(fs, "/srcdat", (short)3); - String outStr = runFsck(conf, 0, true, "/non-existent"); - assertEquals(-1, outStr.indexOf(NamenodeFsck.HEALTHY_STATUS)); - System.out.println(outStr); - util.cleanup(fs, "/srcdat"); - } finally { - if (fs != null) {try{fs.close();} catch(Exception e){}} - if (cluster != null) { cluster.shutdown(); } - } + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + fs = cluster.getFileSystem(); + util.createFiles(fs, "/srcdat"); + util.waitReplication(fs, "/srcdat", (short)3); + String outStr = runFsck(conf, 0, true, "/non-existent"); + assertEquals(-1, outStr.indexOf(NamenodeFsck.HEALTHY_STATUS)); + System.out.println(outStr); + util.cleanup(fs, "/srcdat"); } - /** Test fsck with permission set on inodes */ + /** Test fsck with permission set on inodes. */ @Test public void testFsckPermission() throws Exception { final DFSTestUtil util = new DFSTestUtil.Builder(). setName(getClass().getSimpleName()).setNumFiles(20).build(); - final Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); - MiniDFSCluster cluster = null; - try { - // Create a cluster with the current user, write some files - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); - final MiniDFSCluster c2 = cluster; - final String dir = "/dfsck"; - final Path dirpath = new Path(dir); - final FileSystem fs = c2.getFileSystem(); + // Create a cluster with the current user, write some files + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + final MiniDFSCluster c2 = cluster; + final String dir = "/dfsck"; + final Path dirpath = new Path(dir); + final FileSystem fs = c2.getFileSystem(); - util.createFiles(fs, dir); - util.waitReplication(fs, dir, (short) 3); - fs.setPermission(dirpath, new FsPermission((short) 0700)); + util.createFiles(fs, dir); + util.waitReplication(fs, dir, (short) 3); + fs.setPermission(dirpath, new FsPermission((short) 0700)); - // run DFSck as another user, should fail with permission issue - UserGroupInformation fakeUGI = UserGroupInformation.createUserForTesting( - "ProbablyNotARealUserName", new String[] { "ShangriLa" }); - fakeUGI.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - System.out.println(runFsck(conf, -1, true, dir)); - return null; - } - }); - - // set permission and try DFSck again as the fake user, should succeed - fs.setPermission(dirpath, new FsPermission((short) 0777)); - fakeUGI.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - final String outStr = runFsck(conf, 0, true, dir); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - return null; - } - }); + // run DFSck as another user, should fail with permission issue + UserGroupInformation fakeUGI = UserGroupInformation.createUserForTesting( + "ProbablyNotARealUserName", new String[] {"ShangriLa"}); + fakeUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + System.out.println(runFsck(conf, -1, true, dir)); + return null; + } + }); - util.cleanup(fs, dir); - } finally { - if (cluster != null) { cluster.shutdown(); } - } + // set permission and try DFSck again as the fake user, should succeed + fs.setPermission(dirpath, new FsPermission((short) 0777)); + fakeUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + final String outStr = runFsck(conf, 0, true, dir); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + return null; + } + }); + + util.cleanup(fs, dir); } @Test public void testFsckMove() throws Exception { - Configuration conf = new HdfsConfiguration(); - final int DFS_BLOCK_SIZE = 1024; - final int NUM_DATANODES = 4; - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE); + final int dfsBlockSize = 1024; + final int numDatanodes = 4; + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, dfsBlockSize); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); DFSTestUtil util = new DFSTestUtil("TestFsck", 5, 3, - (5 * DFS_BLOCK_SIZE) + (DFS_BLOCK_SIZE - 1), 5 * DFS_BLOCK_SIZE); - MiniDFSCluster cluster = null; + (5 * dfsBlockSize) + (dfsBlockSize - 1), 5 * dfsBlockSize); FileSystem fs = null; - try { - cluster = new MiniDFSCluster.Builder(conf). - numDataNodes(NUM_DATANODES).build(); - String topDir = "/srcdat"; - fs = cluster.getFileSystem(); - cluster.waitActive(); - util.createFiles(fs, topDir); - util.waitReplication(fs, topDir, (short)3); - String outStr = runFsck(conf, 0, true, "/"); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), conf); - String fileNames[] = util.getFileNames(topDir); - CorruptedTestFile ctFiles[] = new CorruptedTestFile[] { + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(numDatanodes).build(); + String topDir = "/srcdat"; + fs = cluster.getFileSystem(); + cluster.waitActive(); + util.createFiles(fs, topDir); + util.waitReplication(fs, topDir, (short)3); + String outStr = runFsck(conf, 0, true, "/"); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), conf); + String[] fileNames = util.getFileNames(topDir); + CorruptedTestFile[] ctFiles = new CorruptedTestFile[]{ new CorruptedTestFile(fileNames[0], Sets.newHashSet(0), - dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE), + dfsClient, numDatanodes, dfsBlockSize), new CorruptedTestFile(fileNames[1], Sets.newHashSet(2, 3), - dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE), + dfsClient, numDatanodes, dfsBlockSize), new CorruptedTestFile(fileNames[2], Sets.newHashSet(4), - dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE), + dfsClient, numDatanodes, dfsBlockSize), new CorruptedTestFile(fileNames[3], Sets.newHashSet(0, 1, 2, 3), - dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE), + dfsClient, numDatanodes, dfsBlockSize), new CorruptedTestFile(fileNames[4], Sets.newHashSet(1, 2, 3, 4), - dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE) - }; - int totalMissingBlocks = 0; - for (CorruptedTestFile ctFile : ctFiles) { - totalMissingBlocks += ctFile.getTotalMissingBlocks(); - } - for (CorruptedTestFile ctFile : ctFiles) { - ctFile.removeBlocks(cluster); - } - // Wait for fsck to discover all the missing blocks - while (true) { - outStr = runFsck(conf, 1, false, "/"); - String numCorrupt = null; - for (String line : outStr.split(LINE_SEPARATOR)) { - Matcher m = numCorruptBlocksPattern.matcher(line); - if (m.matches()) { - numCorrupt = m.group(1); - break; - } - } - if (numCorrupt == null) { - throw new IOException("failed to find number of corrupt " + - "blocks in fsck output."); - } - if (numCorrupt.equals(Integer.toString(totalMissingBlocks))) { - assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + dfsClient, numDatanodes, dfsBlockSize) + }; + int totalMissingBlocks = 0; + for (CorruptedTestFile ctFile : ctFiles) { + totalMissingBlocks += ctFile.getTotalMissingBlocks(); + } + for (CorruptedTestFile ctFile : ctFiles) { + ctFile.removeBlocks(cluster); + } + // Wait for fsck to discover all the missing blocks + while (true) { + outStr = runFsck(conf, 1, false, "/"); + String numCorrupt = null; + for (String line : outStr.split(LINE_SEPARATOR)) { + Matcher m = NUM_CORRUPT_BLOCKS_PATTERN.matcher(line); + if (m.matches()) { + numCorrupt = m.group(1); break; } - try { - Thread.sleep(100); - } catch (InterruptedException ignore) { - } } - - // Copy the non-corrupt blocks of corruptFileName to lost+found. - outStr = runFsck(conf, 1, false, "/", "-move"); - FSImage.LOG.info("WATERMELON: outStr = " + outStr); - assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); - - // Make sure that we properly copied the block files from the DataNodes - // to lost+found - for (CorruptedTestFile ctFile : ctFiles) { - ctFile.checkSalvagedRemains(); + if (numCorrupt == null) { + throw new IOException("failed to find number of corrupt " + + "blocks in fsck output."); + } + if (numCorrupt.equals(Integer.toString(totalMissingBlocks))) { + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { } - - // Fix the filesystem by removing corruptFileName - outStr = runFsck(conf, 1, true, "/", "-delete"); - assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); - - // Check to make sure we have a healthy filesystem - outStr = runFsck(conf, 0, true, "/"); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - util.cleanup(fs, topDir); - } finally { - if (fs != null) {try{fs.close();} catch(Exception e){}} - if (cluster != null) { cluster.shutdown(); } } + + // Copy the non-corrupt blocks of corruptFileName to lost+found. + outStr = runFsck(conf, 1, false, "/", "-move"); + LOG.info("WATERMELON: outStr = " + outStr); + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + + // Make sure that we properly copied the block files from the DataNodes + // to lost+found + for (CorruptedTestFile ctFile : ctFiles) { + ctFile.checkSalvagedRemains(); + } + + // Fix the filesystem by removing corruptFileName + outStr = runFsck(conf, 1, true, "/", "-delete"); + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + + // Check to make sure we have a healthy filesystem + outStr = runFsck(conf, 0, true, "/"); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + util.cleanup(fs, topDir); } static private class CorruptedTestFile { @@ -424,7 +422,7 @@ public class TestFsck { final private int blockSize; final private byte[] initialContents; - public CorruptedTestFile(String name, Set blocksToCorrupt, + CorruptedTestFile(String name, Set blocksToCorrupt, DFSClient dfsClient, int numDataNodes, int blockSize) throws IOException { this.name = name; @@ -480,7 +478,7 @@ public class TestFsck { new FileOutputStream(blockFile, false); blockFileStream.write("corrupt".getBytes()); blockFileStream.close(); - FSImage.LOG.info("Corrupted block file " + blockFile); + LOG.info("Corrupted block file " + blockFile); } } } @@ -511,7 +509,9 @@ public class TestFsck { if (blockIdx == (numBlocks - 1)) { // The last block might not be full-length len = (int)(in.getFileLength() % blockSize); - if (len == 0) len = blockBuffer.length; + if (len == 0) { + len = blockBuffer.length; + } } IOUtils.readFully(in, blockBuffer, 0, len); int startIdx = blockIdx * blockSize; @@ -530,135 +530,116 @@ public class TestFsck { @Test public void testFsckMoveAndDelete() throws Exception { - final int MAX_MOVE_TRIES = 5; + final int maxMoveTries = 5; DFSTestUtil util = new DFSTestUtil.Builder(). setName("TestFsckMoveAndDelete").setNumFiles(5).build(); - MiniDFSCluster cluster = null; FileSystem fs = null; - try { - Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); - conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); - String topDir = "/srcdat"; - fs = cluster.getFileSystem(); - cluster.waitActive(); - util.createFiles(fs, topDir); - util.waitReplication(fs, topDir, (short)3); - String outStr = runFsck(conf, 0, true, "/"); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - - // Corrupt a block by deleting it - String[] fileNames = util.getFileNames(topDir); - DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), conf); - String corruptFileName = fileNames[0]; - ExtendedBlock block = dfsClient.getNamenode().getBlockLocations( - corruptFileName, 0, Long.MAX_VALUE).get(0).getBlock(); - for (int i=0; i<4; i++) { - File blockFile = cluster.getBlockFile(i, block); - if(blockFile != null && blockFile.exists()) { - assertTrue(blockFile.delete()); - } - } + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + String topDir = "/srcdat"; + fs = cluster.getFileSystem(); + cluster.waitActive(); + util.createFiles(fs, topDir); + util.waitReplication(fs, topDir, (short)3); + String outStr = runFsck(conf, 0, true, "/"); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - // We excpect the filesystem to be corrupted - outStr = runFsck(conf, 1, false, "/"); - while (!outStr.contains(NamenodeFsck.CORRUPT_STATUS)) { - try { - Thread.sleep(100); - } catch (InterruptedException ignore) { - } - outStr = runFsck(conf, 1, false, "/"); - } - - // After a fsck -move, the corrupted file should still exist. - for (int i = 0; i < MAX_MOVE_TRIES; i++) { - outStr = runFsck(conf, 1, true, "/", "-move" ); - assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); - String[] newFileNames = util.getFileNames(topDir); - boolean found = false; - for (String f : newFileNames) { - if (f.equals(corruptFileName)) { - found = true; - break; - } - } - assertTrue(found); + // Corrupt a block by deleting it + String[] fileNames = util.getFileNames(topDir); + DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), conf); + String corruptFileName = fileNames[0]; + ExtendedBlock block = dfsClient.getNamenode().getBlockLocations( + corruptFileName, 0, Long.MAX_VALUE).get(0).getBlock(); + for (int i=0; i<4; i++) { + File blockFile = cluster.getBlockFile(i, block); + if(blockFile != null && blockFile.exists()) { + assertTrue(blockFile.delete()); } - - // Fix the filesystem by moving corrupted files to lost+found - outStr = runFsck(conf, 1, true, "/", "-move", "-delete"); - assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); - - // Check to make sure we have healthy filesystem - outStr = runFsck(conf, 0, true, "/"); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - util.cleanup(fs, topDir); - if (fs != null) {try{fs.close();} catch(Exception e){}} - cluster.shutdown(); - } finally { - if (fs != null) {try{fs.close();} catch(Exception e){}} - if (cluster != null) { cluster.shutdown(); } } + + // We excpect the filesystem to be corrupted + outStr = runFsck(conf, 1, false, "/"); + while (!outStr.contains(NamenodeFsck.CORRUPT_STATUS)) { + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + outStr = runFsck(conf, 1, false, "/"); + } + + // After a fsck -move, the corrupted file should still exist. + for (int i = 0; i < maxMoveTries; i++) { + outStr = runFsck(conf, 1, true, "/", "-move"); + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + String[] newFileNames = util.getFileNames(topDir); + boolean found = false; + for (String f : newFileNames) { + if (f.equals(corruptFileName)) { + found = true; + break; + } + } + assertTrue(found); + } + + // Fix the filesystem by moving corrupted files to lost+found + outStr = runFsck(conf, 1, true, "/", "-move", "-delete"); + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + + // Check to make sure we have healthy filesystem + outStr = runFsck(conf, 0, true, "/"); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + util.cleanup(fs, topDir); } @Test public void testFsckOpenFiles() throws Exception { DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck"). setNumFiles(4).build(); - MiniDFSCluster cluster = null; FileSystem fs = null; - try { - Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); - String topDir = "/srcdat"; - String randomString = "HADOOP "; - fs = cluster.getFileSystem(); - cluster.waitActive(); - util.createFiles(fs, topDir); - util.waitReplication(fs, topDir, (short)3); - String outStr = runFsck(conf, 0, true, "/"); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - // Open a file for writing and do not close for now - Path openFile = new Path(topDir + "/openFile"); - FSDataOutputStream out = fs.create(openFile); - int writeCount = 0; - while (writeCount != 100) { - out.write(randomString.getBytes()); - writeCount++; - } - ((DFSOutputStream) out.getWrappedStream()).hflush(); - // We expect the filesystem to be HEALTHY and show one open file - outStr = runFsck(conf, 0, true, topDir); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - assertFalse(outStr.contains("OPENFORWRITE")); - // Use -openforwrite option to list open files - outStr = runFsck(conf, 0, true, topDir, "-openforwrite"); - System.out.println(outStr); - assertTrue(outStr.contains("OPENFORWRITE")); - assertTrue(outStr.contains("openFile")); - // Close the file - out.close(); - // Now, fsck should show HEALTHY fs and should not show any open files - outStr = runFsck(conf, 0, true, topDir); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - assertFalse(outStr.contains("OPENFORWRITE")); - util.cleanup(fs, topDir); - if (fs != null) {try{fs.close();} catch(Exception e){}} - cluster.shutdown(); - } finally { - if (fs != null) {try{fs.close();} catch(Exception e){}} - if (cluster != null) { cluster.shutdown(); } + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + String topDir = "/srcdat"; + String randomString = "HADOOP "; + fs = cluster.getFileSystem(); + cluster.waitActive(); + util.createFiles(fs, topDir); + util.waitReplication(fs, topDir, (short)3); + String outStr = runFsck(conf, 0, true, "/"); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + // Open a file for writing and do not close for now + Path openFile = new Path(topDir + "/openFile"); + FSDataOutputStream out = fs.create(openFile); + int writeCount = 0; + while (writeCount != 100) { + out.write(randomString.getBytes()); + writeCount++; } + ((DFSOutputStream) out.getWrappedStream()).hflush(); + // We expect the filesystem to be HEALTHY and show one open file + outStr = runFsck(conf, 0, true, topDir); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + assertFalse(outStr.contains("OPENFORWRITE")); + // Use -openforwrite option to list open files + outStr = runFsck(conf, 0, true, topDir, "-openforwrite"); + System.out.println(outStr); + assertTrue(outStr.contains("OPENFORWRITE")); + assertTrue(outStr.contains("openFile")); + // Close the file + out.close(); + // Now, fsck should show HEALTHY fs and should not show any open files + outStr = runFsck(conf, 0, true, topDir); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + assertFalse(outStr.contains("OPENFORWRITE")); + util.cleanup(fs, topDir); } @Test public void testCorruptBlock() throws Exception { - Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); // Set short retry timeouts so this test runs faster conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); @@ -670,8 +651,6 @@ public class TestFsck { String outStr = null; short factor = 1; - MiniDFSCluster cluster = null; - try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); fs = cluster.getFileSystem(); @@ -702,7 +681,7 @@ public class TestFsck { IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), conf, true); } catch (IOException ie) { - // Ignore exception + assertTrue(ie instanceof ChecksumException); } dfsClient = new DFSClient(new InetSocketAddress("localhost", @@ -719,27 +698,23 @@ public class TestFsck { getBlockLocations(file1.toString(), 0, Long.MAX_VALUE); replicaCount = blocks.get(0).getLocations().length; } - assertTrue (blocks.get(0).isCorrupt()); + assertTrue(blocks.get(0).isCorrupt()); // Check if fsck reports the same outStr = runFsck(conf, 1, true, "/"); System.out.println(outStr); assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); assertTrue(outStr.contains("testCorruptBlock")); - } finally { - if (cluster != null) {cluster.shutdown();} - } } @Test public void testUnderMinReplicatedBlock() throws Exception { - Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); // Set short retry timeouts so this test runs faster conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); // Set minReplication to 2 short minReplication=2; - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,minReplication); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, minReplication); FileSystem fs = null; DFSClient dfsClient = null; LocatedBlocks blocks = null; @@ -747,252 +722,234 @@ public class TestFsck { Random random = new Random(); String outStr = null; short factor = 1; - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - Path file1 = new Path("/testUnderMinReplicatedBlock"); - DFSTestUtil.createFile(fs, file1, 1024, minReplication, 0); - // Wait until file replication has completed - DFSTestUtil.waitReplication(fs, file1, minReplication); - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + Path file1 = new Path("/testUnderMinReplicatedBlock"); + DFSTestUtil.createFile(fs, file1, 1024, minReplication, 0); + // Wait until file replication has completed + DFSTestUtil.waitReplication(fs, file1, minReplication); + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1); - // Make sure filesystem is in healthy state - outStr = runFsck(conf, 0, true, "/"); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + // Make sure filesystem is in healthy state + outStr = runFsck(conf, 0, true, "/"); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - // corrupt the first replica - File blockFile = cluster.getBlockFile(0, block); - if (blockFile != null && blockFile.exists()) { - RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); - FileChannel channel = raFile.getChannel(); - String badString = "BADBAD"; - int rand = random.nextInt((int) channel.size()/2); - raFile.seek(rand); - raFile.write(badString.getBytes()); - raFile.close(); + // corrupt the first replica + File blockFile = cluster.getBlockFile(0, block); + if (blockFile != null && blockFile.exists()) { + RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); + FileChannel channel = raFile.getChannel(); + String badString = "BADBAD"; + int rand = random.nextInt((int) channel.size()/2); + raFile.seek(rand); + raFile.write(badString.getBytes()); + raFile.close(); + } + + dfsClient = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), conf); + blocks = dfsClient.getNamenode(). + getBlockLocations(file1.toString(), 0, Long.MAX_VALUE); + replicaCount = blocks.get(0).getLocations().length; + while (replicaCount != factor) { + try { + Thread.sleep(100); + // Read the file to trigger reportBadBlocks + try { + IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), + conf, true); + } catch (IOException ie) { + assertTrue(ie instanceof ChecksumException); + } + System.out.println("sleep in try: replicaCount=" + replicaCount + + " factor=" + factor); + } catch (InterruptedException ignore) { } - - dfsClient = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), conf); blocks = dfsClient.getNamenode(). getBlockLocations(file1.toString(), 0, Long.MAX_VALUE); replicaCount = blocks.get(0).getLocations().length; - while (replicaCount != factor) { - try { - Thread.sleep(100); - // Read the file to trigger reportBadBlocks - try { - IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), conf, - true); - } catch (IOException ie) { - // Ignore exception - } - System.out.println("sleep in try: replicaCount="+replicaCount+" factor="+factor); - } catch (InterruptedException ignore) { - } - blocks = dfsClient.getNamenode(). - getBlockLocations(file1.toString(), 0, Long.MAX_VALUE); - replicaCount = blocks.get(0).getLocations().length; - } - - // Check if fsck reports the same - outStr = runFsck(conf, 0, true, "/"); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - assertTrue(outStr.contains("UNDER MIN REPL'D BLOCKS:\t1 (100.0 %)")); - assertTrue(outStr.contains("dfs.namenode.replication.min:\t2")); - } finally { - if (cluster != null) {cluster.shutdown();} } + + // Check if fsck reports the same + outStr = runFsck(conf, 0, true, "/"); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + assertTrue(outStr.contains("UNDER MIN REPL'D BLOCKS:\t1 (100.0 %)")); + assertTrue(outStr.contains("dfs.namenode.replication.min:\t2")); } @Test(timeout = 60000) public void testFsckReplicaDetails() throws Exception { - final short REPL_FACTOR = 1; - short NUM_DN = 1; + final short replFactor = 1; + short numDn = 1; final long blockSize = 512; final long fileSize = 1024; boolean checkDecommissionInProgress = false; - String[] racks = { "/rack1" }; - String[] hosts = { "host1" }; + String[] racks = {"/rack1"}; + String[] hosts = {"host1"}; - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); - MiniDFSCluster cluster; DistributedFileSystem dfs; cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts).racks(racks).build(); + new MiniDFSCluster.Builder(conf).numDataNodes(numDn).hosts(hosts) + .racks(racks).build(); cluster.waitClusterUp(); dfs = cluster.getFileSystem(); // create files final String testFile = new String("/testfile"); final Path path = new Path(testFile); - DFSTestUtil.createFile(dfs, path, fileSize, REPL_FACTOR, 1000L); - DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR); + DFSTestUtil.createFile(dfs, path, fileSize, replFactor, 1000L); + DFSTestUtil.waitReplication(dfs, path, replFactor); + + // make sure datanode that has replica is fine before decommission + String fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", + "-replicaDetails"); + assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS)); + assertTrue(fsckOut.contains("(LIVE)")); + + // decommission datanode + ExtendedBlock eb = DFSTestUtil.getFirstBlock(dfs, path); + FSNamesystem fsn = cluster.getNameNode().getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + BlockCollection bc = null; try { - // make sure datanode that has replica is fine before decommission - String fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails"); - assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS)); - assertTrue(fsckOut.contains("(LIVE)")); - - // decommission datanode - ExtendedBlock eb = DFSTestUtil.getFirstBlock(dfs, path); - FSNamesystem fsn = cluster.getNameNode().getNamesystem(); - BlockManager bm = fsn.getBlockManager(); - BlockCollection bc = null; - try { - fsn.writeLock(); - BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock()); - bc = fsn.getBlockCollection(bi); - } finally { - fsn.writeUnlock(); - } - DatanodeDescriptor dn = bc.getBlocks()[0] - .getDatanode(0); - bm.getDatanodeManager().getDecomManager().startDecommission(dn); - String dnName = dn.getXferAddr(); - - // check the replica status while decommissioning - fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails"); - assertTrue(fsckOut.contains("(DECOMMISSIONING)")); - - // Start 2nd Datanode and wait for decommission to start - cluster.startDataNodes(conf, 1, true, null, null, null); - DatanodeInfo datanodeInfo = null; - do { - Thread.sleep(2000); - for (DatanodeInfo info : dfs.getDataNodeStats()) { - if (dnName.equals(info.getXferAddr())) { - datanodeInfo = info; - } - } - if (!checkDecommissionInProgress && datanodeInfo != null - && datanodeInfo.isDecommissionInProgress()) { - checkDecommissionInProgress = true; - } - } while (datanodeInfo != null && !datanodeInfo.isDecommissioned()); - - // check the replica status after decommission is done - fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails"); - assertTrue(fsckOut.contains("(DECOMMISSIONED)")); + fsn.writeLock(); + BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock()); + bc = fsn.getBlockCollection(bi); } finally { - if (cluster != null) { - cluster.shutdown(); - } + fsn.writeUnlock(); } + DatanodeDescriptor dn = bc.getBlocks()[0] + .getDatanode(0); + bm.getDatanodeManager().getDecomManager().startDecommission(dn); + String dnName = dn.getXferAddr(); + + // check the replica status while decommissioning + fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", + "-replicaDetails"); + assertTrue(fsckOut.contains("(DECOMMISSIONING)")); + + // Start 2nd Datanode and wait for decommission to start + cluster.startDataNodes(conf, 1, true, null, null, null); + DatanodeInfo datanodeInfo = null; + do { + Thread.sleep(2000); + for (DatanodeInfo info : dfs.getDataNodeStats()) { + if (dnName.equals(info.getXferAddr())) { + datanodeInfo = info; + } + } + if (!checkDecommissionInProgress && datanodeInfo != null + && datanodeInfo.isDecommissionInProgress()) { + checkDecommissionInProgress = true; + } + } while (datanodeInfo != null && !datanodeInfo.isDecommissioned()); + + // check the replica status after decommission is done + fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", + "-replicaDetails"); + assertTrue(fsckOut.contains("(DECOMMISSIONED)")); } - /** Test if fsck can return -1 in case of failure + /** Test if fsck can return -1 in case of failure. * * @throws Exception */ @Test public void testFsckError() throws Exception { - MiniDFSCluster cluster = null; - try { - // bring up a one-node cluster - Configuration conf = new HdfsConfiguration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - String fileName = "/test.txt"; - Path filePath = new Path(fileName); - FileSystem fs = cluster.getFileSystem(); - - // create a one-block file - DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L); - DFSTestUtil.waitReplication(fs, filePath, (short)1); - - // intentionally corrupt NN data structure - INodeFile node = (INodeFile) cluster.getNamesystem().dir.getINode - (fileName, true); - final BlockInfo[] blocks = node.getBlocks(); - assertEquals(blocks.length, 1); - blocks[0].setNumBytes(-1L); // set the block length to be negative - - // run fsck and expect a failure with -1 as the error code - String outStr = runFsck(conf, -1, true, fileName); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.FAILURE_STATUS)); - - // clean up file system - fs.delete(filePath, true); - } finally { - if (cluster != null) {cluster.shutdown();} - } + // bring up a one-node cluster + cluster = new MiniDFSCluster.Builder(conf).build(); + String fileName = "/test.txt"; + Path filePath = new Path(fileName); + FileSystem fs = cluster.getFileSystem(); + + // create a one-block file + DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L); + DFSTestUtil.waitReplication(fs, filePath, (short)1); + + // intentionally corrupt NN data structure + INodeFile node = (INodeFile) cluster.getNamesystem().dir.getINode( + fileName, true); + final BlockInfo[] blocks = node.getBlocks(); + assertEquals(blocks.length, 1); + blocks[0].setNumBytes(-1L); // set the block length to be negative + + // run fsck and expect a failure with -1 as the error code + String outStr = runFsck(conf, -1, true, fileName); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.FAILURE_STATUS)); + + // clean up file system + fs.delete(filePath, true); } - /** check if option -list-corruptfiles of fsck command works properly */ + /** check if option -list-corruptfiles of fsck command works properly. */ @Test public void testFsckListCorruptFilesBlocks() throws Exception { - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); FileSystem fs = null; - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster.Builder(conf).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil util = new DFSTestUtil.Builder(). - setName("testGetCorruptFiles").setNumFiles(3).setMaxLevels(1). - setMaxSize(1024).build(); - util.createFiles(fs, "/corruptData", (short) 1); - util.waitReplication(fs, "/corruptData", (short) 1); + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil util = new DFSTestUtil.Builder(). + setName("testGetCorruptFiles").setNumFiles(3).setMaxLevels(1). + setMaxSize(1024).build(); + util.createFiles(fs, "/corruptData", (short) 1); + util.waitReplication(fs, "/corruptData", (short) 1); - // String outStr = runFsck(conf, 0, true, "/corruptData", "-list-corruptfileblocks"); - String outStr = runFsck(conf, 0, false, "/corruptData", "-list-corruptfileblocks"); - System.out.println("1. good fsck out: " + outStr); - assertTrue(outStr.contains("has 0 CORRUPT files")); - // delete the blocks - final String bpid = cluster.getNamesystem().getBlockPoolId(); - for (int i=0; i<4; i++) { - for (int j=0; j<=1; j++) { - File storageDir = cluster.getInstanceStorageDir(i, j); - File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); - List metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles( - data_dir); - if (metadataFiles == null) - continue; - for (File metadataFile : metadataFiles) { - File blockFile = Block.metaToBlockFile(metadataFile); - assertTrue("Cannot remove file.", blockFile.delete()); - assertTrue("Cannot remove file.", metadataFile.delete()); - } + String outStr = runFsck(conf, 0, false, "/corruptData", + "-list-corruptfileblocks"); + System.out.println("1. good fsck out: " + outStr); + assertTrue(outStr.contains("has 0 CORRUPT files")); + // delete the blocks + final String bpid = cluster.getNamesystem().getBlockPoolId(); + for (int i=0; i<4; i++) { + for (int j=0; j<=1; j++) { + File storageDir = cluster.getInstanceStorageDir(i, j); + File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); + List metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles( + dataDir); + if (metadataFiles == null) { + continue; + } + for (File metadataFile : metadataFiles) { + File blockFile = Block.metaToBlockFile(metadataFile); + assertTrue("Cannot remove file.", blockFile.delete()); + assertTrue("Cannot remove file.", metadataFile.delete()); } } - - // wait for the namenode to see the corruption - final NamenodeProtocols namenode = cluster.getNameNodeRpc(); - CorruptFileBlocks corruptFileBlocks = namenode - .listCorruptFileBlocks("/corruptData", null); - int numCorrupt = corruptFileBlocks.getFiles().length; - while (numCorrupt == 0) { - Thread.sleep(1000); - corruptFileBlocks = namenode - .listCorruptFileBlocks("/corruptData", null); - numCorrupt = corruptFileBlocks.getFiles().length; - } - outStr = runFsck(conf, -1, true, "/corruptData", "-list-corruptfileblocks"); - System.out.println("2. bad fsck out: " + outStr); - assertTrue(outStr.contains("has 3 CORRUPT files")); - - // Do a listing on a dir which doesn't have any corrupt blocks and validate - util.createFiles(fs, "/goodData"); - outStr = runFsck(conf, 0, true, "/goodData", "-list-corruptfileblocks"); - System.out.println("3. good fsck out: " + outStr); - assertTrue(outStr.contains("has 0 CORRUPT files")); - util.cleanup(fs,"/corruptData"); - util.cleanup(fs, "/goodData"); - } finally { - if (cluster != null) {cluster.shutdown();} } + + // wait for the namenode to see the corruption + final NamenodeProtocols namenode = cluster.getNameNodeRpc(); + CorruptFileBlocks corruptFileBlocks = namenode + .listCorruptFileBlocks("/corruptData", null); + int numCorrupt = corruptFileBlocks.getFiles().length; + while (numCorrupt == 0) { + Thread.sleep(1000); + corruptFileBlocks = namenode + .listCorruptFileBlocks("/corruptData", null); + numCorrupt = corruptFileBlocks.getFiles().length; + } + outStr = runFsck(conf, -1, true, "/corruptData", "-list-corruptfileblocks"); + System.out.println("2. bad fsck out: " + outStr); + assertTrue(outStr.contains("has 3 CORRUPT files")); + + // Do a listing on a dir which doesn't have any corrupt blocks and validate + util.createFiles(fs, "/goodData"); + outStr = runFsck(conf, 0, true, "/goodData", "-list-corruptfileblocks"); + System.out.println("3. good fsck out: " + outStr); + assertTrue(outStr.contains("has 0 CORRUPT files")); + util.cleanup(fs, "/corruptData"); + util.cleanup(fs, "/goodData"); } /** @@ -1001,191 +958,161 @@ public class TestFsck { */ @Test public void testToCheckTheFsckCommandOnIllegalArguments() throws Exception { - MiniDFSCluster cluster = null; - try { - // bring up a one-node cluster - Configuration conf = new HdfsConfiguration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - String fileName = "/test.txt"; - Path filePath = new Path(fileName); - FileSystem fs = cluster.getFileSystem(); + // bring up a one-node cluster + cluster = new MiniDFSCluster.Builder(conf).build(); + String fileName = "/test.txt"; + Path filePath = new Path(fileName); + FileSystem fs = cluster.getFileSystem(); - // create a one-block file - DFSTestUtil.createFile(fs, filePath, 1L, (short) 1, 1L); - DFSTestUtil.waitReplication(fs, filePath, (short) 1); + // create a one-block file + DFSTestUtil.createFile(fs, filePath, 1L, (short) 1, 1L); + DFSTestUtil.waitReplication(fs, filePath, (short) 1); - // passing illegal option - String outStr = runFsck(conf, -1, true, fileName, "-thisIsNotAValidFlag"); - System.out.println(outStr); - assertTrue(!outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + // passing illegal option + String outStr = runFsck(conf, -1, true, fileName, "-thisIsNotAValidFlag"); + System.out.println(outStr); + assertTrue(!outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - // passing multiple paths are arguments - outStr = runFsck(conf, -1, true, "/", fileName); - System.out.println(outStr); - assertTrue(!outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - // clean up file system - fs.delete(filePath, true); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + // passing multiple paths are arguments + outStr = runFsck(conf, -1, true, "/", fileName); + System.out.println(outStr); + assertTrue(!outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + // clean up file system + fs.delete(filePath, true); } /** - * Tests that the # of missing block replicas and expected replicas is correct + * Tests that the # of missing block replicas and expected replicas is + * correct. * @throws IOException */ @Test public void testFsckMissingReplicas() throws IOException { // Desired replication factor - // Set this higher than NUM_REPLICAS so it's under-replicated - final short REPL_FACTOR = 2; + // Set this higher than numReplicas so it's under-replicated + final short replFactor = 2; // Number of replicas to actually start - final short NUM_REPLICAS = 1; + final short numReplicas = 1; // Number of blocks to write - final short NUM_BLOCKS = 3; + final short numBlocks = 3; // Set a small-ish blocksize final long blockSize = 512; - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - MiniDFSCluster cluster = null; DistributedFileSystem dfs = null; - try { - // Startup a minicluster - cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(NUM_REPLICAS).build(); - assertNotNull("Failed Cluster Creation", cluster); - cluster.waitClusterUp(); - dfs = cluster.getFileSystem(); - assertNotNull("Failed to get FileSystem", dfs); - - // Create a file that will be intentionally under-replicated - final String pathString = new String("/testfile"); - final Path path = new Path(pathString); - long fileLen = blockSize * NUM_BLOCKS; - DFSTestUtil.createFile(dfs, path, fileLen, REPL_FACTOR, 1); - - // Create an under-replicated file - NameNode namenode = cluster.getNameNode(); - NetworkTopology nettop = cluster.getNamesystem().getBlockManager() - .getDatanodeManager().getNetworkTopology(); - Map pmap = new HashMap(); - Writer result = new StringWriter(); - PrintWriter out = new PrintWriter(result, true); - InetAddress remoteAddress = InetAddress.getLocalHost(); - NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out, - NUM_REPLICAS, remoteAddress); - - // Run the fsck and check the Result - final HdfsFileStatus file = - namenode.getRpcServer().getFileInfo(pathString); - assertNotNull(file); - Result res = new Result(conf); - fsck.check(pathString, file, res); - // Also print the output from the fsck, for ex post facto sanity checks - System.out.println(result.toString()); - assertEquals(res.missingReplicas, - (NUM_BLOCKS*REPL_FACTOR) - (NUM_BLOCKS*NUM_REPLICAS)); - assertEquals(res.numExpectedReplicas, NUM_BLOCKS*REPL_FACTOR); - } finally { - if(dfs != null) { - dfs.close(); - } - if(cluster != null) { - cluster.shutdown(); - } - } + // Startup a minicluster + cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(numReplicas).build(); + assertNotNull("Failed Cluster Creation", cluster); + cluster.waitClusterUp(); + dfs = cluster.getFileSystem(); + assertNotNull("Failed to get FileSystem", dfs); + + // Create a file that will be intentionally under-replicated + final String pathString = new String("/testfile"); + final Path path = new Path(pathString); + long fileLen = blockSize * numBlocks; + DFSTestUtil.createFile(dfs, path, fileLen, replFactor, 1); + + // Create an under-replicated file + NameNode namenode = cluster.getNameNode(); + NetworkTopology nettop = cluster.getNamesystem().getBlockManager() + .getDatanodeManager().getNetworkTopology(); + Map pmap = new HashMap(); + Writer result = new StringWriter(); + PrintWriter out = new PrintWriter(result, true); + InetAddress remoteAddress = InetAddress.getLocalHost(); + NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out, + numReplicas, remoteAddress); + + // Run the fsck and check the Result + final HdfsFileStatus file = + namenode.getRpcServer().getFileInfo(pathString); + assertNotNull(file); + Result res = new Result(conf); + fsck.check(pathString, file, res); + // Also print the output from the fsck, for ex post facto sanity checks + System.out.println(result.toString()); + assertEquals(res.missingReplicas, + (numBlocks*replFactor) - (numBlocks*numReplicas)); + assertEquals(res.numExpectedReplicas, numBlocks*replFactor); } /** - * Tests that the # of misreplaced replicas is correct + * Tests that the # of misreplaced replicas is correct. * @throws IOException */ @Test public void testFsckMisPlacedReplicas() throws IOException { // Desired replication factor - final short REPL_FACTOR = 2; + final short replFactor = 2; // Number of replicas to actually start - short NUM_DN = 2; + short numDn = 2; // Number of blocks to write - final short NUM_BLOCKS = 3; + final short numBlocks = 3; // Set a small-ish blocksize final long blockSize = 512; - String [] racks = {"/rack1", "/rack1"}; - String [] hosts = {"host1", "host2"}; + String[] racks = {"/rack1", "/rack1"}; + String[] hosts = {"host1", "host2"}; - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - MiniDFSCluster cluster = null; DistributedFileSystem dfs = null; - try { - // Startup a minicluster - cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts) - .racks(racks).build(); - assertNotNull("Failed Cluster Creation", cluster); - cluster.waitClusterUp(); - dfs = cluster.getFileSystem(); - assertNotNull("Failed to get FileSystem", dfs); - - // Create a file that will be intentionally under-replicated - final String pathString = new String("/testfile"); - final Path path = new Path(pathString); - long fileLen = blockSize * NUM_BLOCKS; - DFSTestUtil.createFile(dfs, path, fileLen, REPL_FACTOR, 1); - - // Create an under-replicated file - NameNode namenode = cluster.getNameNode(); - NetworkTopology nettop = cluster.getNamesystem().getBlockManager() - .getDatanodeManager().getNetworkTopology(); - // Add a new node on different rack, so previous blocks' replicas - // are considered to be misplaced - nettop.add(DFSTestUtil.getDatanodeDescriptor("/rack2", "/host3")); - NUM_DN++; - - Map pmap = new HashMap(); - Writer result = new StringWriter(); - PrintWriter out = new PrintWriter(result, true); - InetAddress remoteAddress = InetAddress.getLocalHost(); - NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out, - NUM_DN, remoteAddress); - - // Run the fsck and check the Result - final HdfsFileStatus file = - namenode.getRpcServer().getFileInfo(pathString); - assertNotNull(file); - Result res = new Result(conf); - fsck.check(pathString, file, res); - // check misReplicatedBlock number. - assertEquals(res.numMisReplicatedBlocks, NUM_BLOCKS); - } finally { - if(dfs != null) { - dfs.close(); - } - if(cluster != null) { - cluster.shutdown(); - } - } + // Startup a minicluster + cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(numDn).hosts(hosts) + .racks(racks).build(); + assertNotNull("Failed Cluster Creation", cluster); + cluster.waitClusterUp(); + dfs = cluster.getFileSystem(); + assertNotNull("Failed to get FileSystem", dfs); + + // Create a file that will be intentionally under-replicated + final String pathString = new String("/testfile"); + final Path path = new Path(pathString); + long fileLen = blockSize * numBlocks; + DFSTestUtil.createFile(dfs, path, fileLen, replFactor, 1); + + // Create an under-replicated file + NameNode namenode = cluster.getNameNode(); + NetworkTopology nettop = cluster.getNamesystem().getBlockManager() + .getDatanodeManager().getNetworkTopology(); + // Add a new node on different rack, so previous blocks' replicas + // are considered to be misplaced + nettop.add(DFSTestUtil.getDatanodeDescriptor("/rack2", "/host3")); + numDn++; + + Map pmap = new HashMap(); + Writer result = new StringWriter(); + PrintWriter out = new PrintWriter(result, true); + InetAddress remoteAddress = InetAddress.getLocalHost(); + NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out, + numDn, remoteAddress); + + // Run the fsck and check the Result + final HdfsFileStatus file = + namenode.getRpcServer().getFileInfo(pathString); + assertNotNull(file); + Result res = new Result(conf); + fsck.check(pathString, file, res); + // check misReplicatedBlock number. + assertEquals(res.numMisReplicatedBlocks, numBlocks); } - /** Test fsck with FileNotFound */ + /** Test fsck with FileNotFound. */ @Test public void testFsckFileNotFound() throws Exception { // Number of replicas to actually start - final short NUM_REPLICAS = 1; + final short numReplicas = 1; - Configuration conf = new Configuration(); NameNode namenode = mock(NameNode.class); NetworkTopology nettop = mock(NetworkTopology.class); - Map pmap = new HashMap<>(); + Map pmap = new HashMap<>(); Writer result = new StringWriter(); PrintWriter out = new PrintWriter(result, true); InetAddress remoteAddress = InetAddress.getLocalHost(); @@ -1199,11 +1126,12 @@ public class TestFsck { when(fsName.getBlockManager()).thenReturn(blockManager); when(fsName.getFSDirectory()).thenReturn(fsd); when(fsd.getFSNamesystem()).thenReturn(fsName); - when(fsd.resolvePath(any(FSPermissionChecker.class), anyString())).thenReturn(iip); + when(fsd.resolvePath(any(FSPermissionChecker.class), anyString())) + .thenReturn(iip); when(blockManager.getDatanodeManager()).thenReturn(dnManager); NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out, - NUM_REPLICAS, remoteAddress); + numReplicas, remoteAddress); String pathString = "/tmp/testFile"; @@ -1216,8 +1144,8 @@ public class TestFsck { FsPermission perms = FsPermission.getDefault(); String owner = "foo"; String group = "bar"; - byte [] symlink = null; - byte [] path = DFSUtil.string2Bytes(pathString); + byte[] symlink = null; + byte[] path = DFSUtil.string2Bytes(pathString); long fileId = 312321L; int numChildren = 1; byte storagePolicy = 0; @@ -1235,95 +1163,82 @@ public class TestFsck { assertTrue(res.toString().contains("HEALTHY")); } - /** Test fsck with symlinks in the filesystem */ + /** Test fsck with symlinks in the filesystem. */ @Test public void testFsckSymlink() throws Exception { final DFSTestUtil util = new DFSTestUtil.Builder(). setName(getClass().getSimpleName()).setNumFiles(1).build(); - final Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); - MiniDFSCluster cluster = null; FileSystem fs = null; - try { - final long precision = 1L; - conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); - fs = cluster.getFileSystem(); - final String fileName = "/srcdat"; - util.createFiles(fs, fileName); - final FileContext fc = FileContext.getFileContext( - cluster.getConfiguration(0)); - final Path file = new Path(fileName); - final Path symlink = new Path("/srcdat-symlink"); - fc.createSymlink(file, symlink, false); - util.waitReplication(fs, fileName, (short)3); - long aTime = fc.getFileStatus(symlink).getAccessTime(); - Thread.sleep(precision); - setupAuditLogs(); - String outStr = runFsck(conf, 0, true, "/"); - verifyAuditLogs(); - assertEquals(aTime, fc.getFileStatus(symlink).getAccessTime()); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - assertTrue(outStr.contains("Total symlinks:\t\t1")); - util.cleanup(fs, fileName); - } finally { - if (fs != null) {try{fs.close();} catch(Exception e){}} - if (cluster != null) { cluster.shutdown(); } - } + final long precision = 1L; + conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, + precision); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + fs = cluster.getFileSystem(); + final String fileName = "/srcdat"; + util.createFiles(fs, fileName); + final FileContext fc = FileContext.getFileContext( + cluster.getConfiguration(0)); + final Path file = new Path(fileName); + final Path symlink = new Path("/srcdat-symlink"); + fc.createSymlink(file, symlink, false); + util.waitReplication(fs, fileName, (short)3); + long aTime = fc.getFileStatus(symlink).getAccessTime(); + Thread.sleep(precision); + setupAuditLogs(); + String outStr = runFsck(conf, 0, true, "/"); + verifyAuditLogs(); + assertEquals(aTime, fc.getFileStatus(symlink).getAccessTime()); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + assertTrue(outStr.contains("Total symlinks:\t\t1")); + util.cleanup(fs, fileName); } /** - * Test for including the snapshot files in fsck report + * Test for including the snapshot files in fsck report. */ @Test public void testFsckForSnapshotFiles() throws Exception { - final Configuration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) .build(); - try { - String runFsck = runFsck(conf, 0, true, "/", "-includeSnapshots", - "-files"); - assertTrue(runFsck.contains("HEALTHY")); - final String fileName = "/srcdat"; - DistributedFileSystem hdfs = cluster.getFileSystem(); - Path file1 = new Path(fileName); - DFSTestUtil.createFile(hdfs, file1, 1024, (short) 1, 1000L); - hdfs.allowSnapshot(new Path("/")); - hdfs.createSnapshot(new Path("/"), "mySnapShot"); - runFsck = runFsck(conf, 0, true, "/", "-includeSnapshots", "-files"); - assertTrue(runFsck.contains("/.snapshot/mySnapShot/srcdat")); - runFsck = runFsck(conf, 0, true, "/", "-files"); - assertFalse(runFsck.contains("mySnapShot")); - } finally { - cluster.shutdown(); - } + String runFsck = runFsck(conf, 0, true, "/", "-includeSnapshots", + "-files"); + assertTrue(runFsck.contains("HEALTHY")); + final String fileName = "/srcdat"; + DistributedFileSystem hdfs = cluster.getFileSystem(); + Path file1 = new Path(fileName); + DFSTestUtil.createFile(hdfs, file1, 1024, (short) 1, 1000L); + hdfs.allowSnapshot(new Path("/")); + hdfs.createSnapshot(new Path("/"), "mySnapShot"); + runFsck = runFsck(conf, 0, true, "/", "-includeSnapshots", "-files"); + assertTrue(runFsck.contains("/.snapshot/mySnapShot/srcdat")); + runFsck = runFsck(conf, 0, true, "/", "-files"); + assertFalse(runFsck.contains("mySnapShot")); } /** - * Test for blockIdCK + * Test for blockIdCK. */ @Test public void testBlockIdCK() throws Exception { - final short REPL_FACTOR = 2; - short NUM_DN = 2; + final short replFactor = 2; + short numDn = 2; final long blockSize = 512; - String [] racks = {"/rack1", "/rack2"}; - String [] hosts = {"host1", "host2"}; + String[] racks = {"/rack1", "/rack2"}; + String[] hosts = {"host1", "host2"}; - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); - MiniDFSCluster cluster = null; DistributedFileSystem dfs = null; cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts) + new MiniDFSCluster.Builder(conf).numDataNodes(numDn).hosts(hosts) .racks(racks).build(); assertNotNull("Failed Cluster Creation", cluster); @@ -1332,12 +1247,12 @@ public class TestFsck { assertNotNull("Failed to get FileSystem", dfs); DFSTestUtil util = new DFSTestUtil.Builder(). - setName(getClass().getSimpleName()).setNumFiles(1).build(); + setName(getClass().getSimpleName()).setNumFiles(1).build(); //create files final String pathString = new String("/testfile"); final Path path = new Path(pathString); - util.createFile(dfs, path, 1024, REPL_FACTOR , 1000L); - util.waitReplication(dfs, path, REPL_FACTOR); + util.createFile(dfs, path, 1024, replFactor, 1000L); + util.waitReplication(dfs, path, replFactor); StringBuilder sb = new StringBuilder(); for (LocatedBlock lb: util.getAllBlocks(dfs, path)){ sb.append(lb.getBlock().getLocalBlock().getBlockName()+" "); @@ -1345,46 +1260,40 @@ public class TestFsck { String[] bIds = sb.toString().split(" "); //run fsck - try { - //illegal input test - String runFsckResult = runFsck(conf, 0, true, "/", "-blockId", - "not_a_block_id"); - assertTrue(runFsckResult.contains("Incorrect blockId format:")); + //illegal input test + String runFsckResult = runFsck(conf, 0, true, "/", "-blockId", + "not_a_block_id"); + assertTrue(runFsckResult.contains("Incorrect blockId format:")); - //general test - runFsckResult = runFsck(conf, 0, true, "/", "-blockId", sb.toString()); - assertTrue(runFsckResult.contains(bIds[0])); - assertTrue(runFsckResult.contains(bIds[1])); - assertTrue(runFsckResult.contains( - "Block replica on datanode/rack: host1/rack1 is HEALTHY")); - assertTrue(runFsckResult.contains( - "Block replica on datanode/rack: host2/rack2 is HEALTHY")); - } finally { - cluster.shutdown(); - } + //general test + runFsckResult = runFsck(conf, 0, true, "/", "-blockId", sb.toString()); + assertTrue(runFsckResult.contains(bIds[0])); + assertTrue(runFsckResult.contains(bIds[1])); + assertTrue(runFsckResult.contains( + "Block replica on datanode/rack: host1/rack1 is HEALTHY")); + assertTrue(runFsckResult.contains( + "Block replica on datanode/rack: host2/rack2 is HEALTHY")); } /** - * Test for blockIdCK with datanode decommission + * Test for blockIdCK with datanode decommission. */ @Test public void testBlockIdCKDecommission() throws Exception { - final short REPL_FACTOR = 1; - short NUM_DN = 2; + final short replFactor = 1; + short numDn = 2; final long blockSize = 512; boolean checkDecommissionInProgress = false; - String [] racks = {"/rack1", "/rack2"}; - String [] hosts = {"host1", "host2"}; + String[] racks = {"/rack1", "/rack2"}; + String[] hosts = {"host1", "host2"}; - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); - MiniDFSCluster cluster; - DistributedFileSystem dfs ; + DistributedFileSystem dfs; cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts) + new MiniDFSCluster.Builder(conf).numDataNodes(numDn).hosts(hosts) .racks(racks).build(); assertNotNull("Failed Cluster Creation", cluster); @@ -1397,137 +1306,124 @@ public class TestFsck { //create files final String pathString = new String("/testfile"); final Path path = new Path(pathString); - util.createFile(dfs, path, 1024, REPL_FACTOR, 1000L); - util.waitReplication(dfs, path, REPL_FACTOR); + util.createFile(dfs, path, 1024, replFactor, 1000L); + util.waitReplication(dfs, path, replFactor); StringBuilder sb = new StringBuilder(); for (LocatedBlock lb: util.getAllBlocks(dfs, path)){ sb.append(lb.getBlock().getLocalBlock().getBlockName()+" "); } String[] bIds = sb.toString().split(" "); + + //make sure datanode that has replica is fine before decommission + String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + + //decommission datanode + FSNamesystem fsn = cluster.getNameNode().getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + ExtendedBlock eb = util.getFirstBlock(dfs, path); + BlockCollection bc = null; try { - //make sure datanode that has replica is fine before decommission - String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - - //decommission datanode - FSNamesystem fsn = cluster.getNameNode().getNamesystem(); - BlockManager bm = fsn.getBlockManager(); - ExtendedBlock eb = util.getFirstBlock(dfs, path); - BlockCollection bc = null; - try { - fsn.writeLock(); - BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock()); - bc = fsn.getBlockCollection(bi); - } finally { - fsn.writeUnlock(); - } - DatanodeDescriptor dn = bc.getBlocks()[0].getDatanode(0); - bm.getDatanodeManager().getDecomManager().startDecommission(dn); - String dnName = dn.getXferAddr(); - - //wait for decommission start - DatanodeInfo datanodeInfo = null; - int count = 0; - do { - Thread.sleep(2000); - for (DatanodeInfo info : dfs.getDataNodeStats()) { - if (dnName.equals(info.getXferAddr())) { - datanodeInfo = info; - } - } - //check decommissioning only once - if(!checkDecommissionInProgress && datanodeInfo != null - && datanodeInfo.isDecommissionInProgress()) { - String fsckOut = runFsck(conf, 3, true, "/", "-blockId", bIds[0]); - assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONING_STATUS)); - checkDecommissionInProgress = true; - } - } while (datanodeInfo != null && !datanodeInfo.isDecommissioned()); - - //check decommissioned - String fsckOut = runFsck(conf, 2, true, "/", "-blockId", bIds[0]); - assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONED_STATUS)); + fsn.writeLock(); + BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock()); + bc = fsn.getBlockCollection(bi); } finally { - if (cluster != null) { - cluster.shutdown(); - } + fsn.writeUnlock(); } + DatanodeDescriptor dn = bc.getBlocks()[0].getDatanode(0); + bm.getDatanodeManager().getDecomManager().startDecommission(dn); + String dnName = dn.getXferAddr(); + + //wait for decommission start + DatanodeInfo datanodeInfo = null; + int count = 0; + do { + Thread.sleep(2000); + for (DatanodeInfo info : dfs.getDataNodeStats()) { + if (dnName.equals(info.getXferAddr())) { + datanodeInfo = info; + } + } + //check decommissioning only once + if(!checkDecommissionInProgress && datanodeInfo != null + && datanodeInfo.isDecommissionInProgress()) { + String fsckOut = runFsck(conf, 3, true, "/", "-blockId", bIds[0]); + assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONING_STATUS)); + checkDecommissionInProgress = true; + } + } while (datanodeInfo != null && !datanodeInfo.isDecommissioned()); + + //check decommissioned + String fsckOut = runFsck(conf, 2, true, "/", "-blockId", bIds[0]); + assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONED_STATUS)); } /** - * Test for blockIdCK with block corruption + * Test for blockIdCK with block corruption. */ @Test public void testBlockIdCKCorruption() throws Exception { - short NUM_DN = 1; + short numDn = 1; final long blockSize = 512; Random random = new Random(); ExtendedBlock block; short repFactor = 1; - String [] racks = {"/rack1"}; - String [] hosts = {"host1"}; + String[] racks = {"/rack1"}; + String[] hosts = {"host1"}; - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); // Set short retry timeouts so this test runs faster conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); - MiniDFSCluster cluster = null; DistributedFileSystem dfs = null; - try { - cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts) - .racks(racks).build(); + cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(numDn).hosts(hosts) + .racks(racks).build(); - assertNotNull("Failed Cluster Creation", cluster); - cluster.waitClusterUp(); - dfs = cluster.getFileSystem(); - assertNotNull("Failed to get FileSystem", dfs); + assertNotNull("Failed Cluster Creation", cluster); + cluster.waitClusterUp(); + dfs = cluster.getFileSystem(); + assertNotNull("Failed to get FileSystem", dfs); - DFSTestUtil util = new DFSTestUtil.Builder(). + DFSTestUtil util = new DFSTestUtil.Builder(). setName(getClass().getSimpleName()).setNumFiles(1).build(); - //create files - final String pathString = new String("/testfile"); - final Path path = new Path(pathString); - util.createFile(dfs, path, 1024, repFactor, 1000L); - util.waitReplication(dfs, path, repFactor); - StringBuilder sb = new StringBuilder(); - for (LocatedBlock lb: util.getAllBlocks(dfs, path)){ - sb.append(lb.getBlock().getLocalBlock().getBlockName()+" "); - } - String[] bIds = sb.toString().split(" "); - - //make sure block is healthy before we corrupt it - String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - - // corrupt replicas - block = DFSTestUtil.getFirstBlock(dfs, path); - File blockFile = cluster.getBlockFile(0, block); - if (blockFile != null && blockFile.exists()) { - RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); - FileChannel channel = raFile.getChannel(); - String badString = "BADBAD"; - int rand = random.nextInt((int) channel.size()/2); - raFile.seek(rand); - raFile.write(badString.getBytes()); - raFile.close(); - } - - util.waitCorruptReplicas(dfs, cluster.getNamesystem(), path, block, 1); - - outStr = runFsck(conf, 1, false, "/", "-blockId", block.getBlockName()); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); - } finally { - if (cluster != null) { - cluster.shutdown(); - } + //create files + final String pathString = new String("/testfile"); + final Path path = new Path(pathString); + util.createFile(dfs, path, 1024, repFactor, 1000L); + util.waitReplication(dfs, path, repFactor); + StringBuilder sb = new StringBuilder(); + for (LocatedBlock lb: util.getAllBlocks(dfs, path)){ + sb.append(lb.getBlock().getLocalBlock().getBlockName()+" "); } + String[] bIds = sb.toString().split(" "); + + //make sure block is healthy before we corrupt it + String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + + // corrupt replicas + block = DFSTestUtil.getFirstBlock(dfs, path); + File blockFile = cluster.getBlockFile(0, block); + if (blockFile != null && blockFile.exists()) { + RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); + FileChannel channel = raFile.getChannel(); + String badString = "BADBAD"; + int rand = random.nextInt((int) channel.size()/2); + raFile.seek(rand); + raFile.write(badString.getBytes()); + raFile.close(); + } + + util.waitCorruptReplicas(dfs, cluster.getNamesystem(), path, block, 1); + + outStr = runFsck(conf, 1, false, "/", "-blockId", block.getBlockName()); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); } private void writeFile(final DistributedFileSystem dfs, @@ -1539,71 +1435,64 @@ public class TestFsck { } private void writeFile(final DistributedFileSystem dfs, - String dirName, String fileName, String StoragePolicy) throws IOException { + String dirName, String fileName, String storagePolicy) + throws IOException { Path dirPath = new Path(dirName); dfs.mkdirs(dirPath); - dfs.setStoragePolicy(dirPath, StoragePolicy); + dfs.setStoragePolicy(dirPath, storagePolicy); writeFile(dfs, dirPath, fileName); } /** - * Test storage policy display + * Test storage policy display. */ @Test public void testStoragePoliciesCK() throws Exception { - final Configuration conf = new HdfsConfiguration(); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3) .storageTypes( new StorageType[] {StorageType.DISK, StorageType.ARCHIVE}) .build(); - try { - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - writeFile(dfs, "/testhot", "file", "HOT"); - writeFile(dfs, "/testwarm", "file", "WARM"); - writeFile(dfs, "/testcold", "file", "COLD"); - String outStr = runFsck(conf, 0, true, "/", "-storagepolicies"); - assertTrue(outStr.contains("DISK:3(HOT)")); - assertTrue(outStr.contains("DISK:1,ARCHIVE:2(WARM)")); - assertTrue(outStr.contains("ARCHIVE:3(COLD)")); - assertTrue(outStr.contains("All blocks satisfy specified storage policy.")); - dfs.setStoragePolicy(new Path("/testhot"), "COLD"); - dfs.setStoragePolicy(new Path("/testwarm"), "COLD"); - outStr = runFsck(conf, 0, true, "/", "-storagepolicies"); - assertTrue(outStr.contains("DISK:3(HOT)")); - assertTrue(outStr.contains("DISK:1,ARCHIVE:2(WARM)")); - assertTrue(outStr.contains("ARCHIVE:3(COLD)")); - assertFalse(outStr.contains("All blocks satisfy specified storage policy.")); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + writeFile(dfs, "/testhot", "file", "HOT"); + writeFile(dfs, "/testwarm", "file", "WARM"); + writeFile(dfs, "/testcold", "file", "COLD"); + String outStr = runFsck(conf, 0, true, "/", "-storagepolicies"); + assertTrue(outStr.contains("DISK:3(HOT)")); + assertTrue(outStr.contains("DISK:1,ARCHIVE:2(WARM)")); + assertTrue(outStr.contains("ARCHIVE:3(COLD)")); + assertTrue(outStr.contains("All blocks satisfy specified storage policy.")); + dfs.setStoragePolicy(new Path("/testhot"), "COLD"); + dfs.setStoragePolicy(new Path("/testwarm"), "COLD"); + outStr = runFsck(conf, 0, true, "/", "-storagepolicies"); + assertTrue(outStr.contains("DISK:3(HOT)")); + assertTrue(outStr.contains("DISK:1,ARCHIVE:2(WARM)")); + assertTrue(outStr.contains("ARCHIVE:3(COLD)")); + assertFalse(outStr.contains( + "All blocks satisfy specified storage policy.")); } /** - * Test for blocks on decommissioning hosts are not shown as missing + * Test for blocks on decommissioning hosts are not shown as missing. */ @Test public void testFsckWithDecommissionedReplicas() throws Exception { - final short REPL_FACTOR = 1; - short NUM_DN = 2; + final short replFactor = 1; + short numDn = 2; final long blockSize = 512; final long fileSize = 1024; boolean checkDecommissionInProgress = false; - String [] racks = {"/rack1", "/rack2"}; - String [] hosts = {"host1", "host2"}; + String[] racks = {"/rack1", "/rack2"}; + String[] hosts = {"host1", "host2"}; - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); - MiniDFSCluster cluster; - DistributedFileSystem dfs ; + DistributedFileSystem dfs; cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts) + new MiniDFSCluster.Builder(conf).numDataNodes(numDn).hosts(hosts) .racks(racks).build(); assertNotNull("Failed Cluster Creation", cluster); @@ -1617,58 +1506,53 @@ public class TestFsck { //create files final String testFile = new String("/testfile"); final Path path = new Path(testFile); - util.createFile(dfs, path, fileSize, REPL_FACTOR, 1000L); - util.waitReplication(dfs, path, REPL_FACTOR); + util.createFile(dfs, path, fileSize, replFactor, 1000L); + util.waitReplication(dfs, path, replFactor); + + // make sure datanode that has replica is fine before decommission + String outStr = runFsck(conf, 0, true, testFile); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + + // decommission datanode + FSNamesystem fsn = cluster.getNameNode().getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + ExtendedBlock eb = util.getFirstBlock(dfs, path); + BlockCollection bc = null; try { - // make sure datanode that has replica is fine before decommission - String outStr = runFsck(conf, 0, true, testFile); - System.out.println(outStr); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - - // decommission datanode - FSNamesystem fsn = cluster.getNameNode().getNamesystem(); - BlockManager bm = fsn.getBlockManager(); - ExtendedBlock eb = util.getFirstBlock(dfs, path); - BlockCollection bc = null; - try { - fsn.writeLock(); - BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock()); - bc = fsn.getBlockCollection(bi); - } finally { - fsn.writeUnlock(); - } - DatanodeDescriptor dn = bc.getBlocks()[0] - .getDatanode(0); - bm.getDatanodeManager().getDecomManager().startDecommission(dn); - String dnName = dn.getXferAddr(); - - // wait for decommission start - DatanodeInfo datanodeInfo = null; - int count = 0; - do { - Thread.sleep(2000); - for (DatanodeInfo info : dfs.getDataNodeStats()) { - if (dnName.equals(info.getXferAddr())) { - datanodeInfo = info; - } - } - // check the replica status should be healthy(0) - // instead of corruption (1) during decommissioning - if(!checkDecommissionInProgress && datanodeInfo != null - && datanodeInfo.isDecommissionInProgress()) { - String fsckOut = runFsck(conf, 0, true, testFile); - checkDecommissionInProgress = true; - } - } while (datanodeInfo != null && !datanodeInfo.isDecommissioned()); - - // check the replica status should be healthy(0) after decommission - // is done - String fsckOut = runFsck(conf, 0, true, testFile); + fsn.writeLock(); + BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock()); + bc = fsn.getBlockCollection(bi); } finally { - if (cluster != null) { - cluster.shutdown(); - } + fsn.writeUnlock(); } + DatanodeDescriptor dn = bc.getBlocks()[0] + .getDatanode(0); + bm.getDatanodeManager().getDecomManager().startDecommission(dn); + String dnName = dn.getXferAddr(); + + // wait for decommission start + DatanodeInfo datanodeInfo = null; + int count = 0; + do { + Thread.sleep(2000); + for (DatanodeInfo info : dfs.getDataNodeStats()) { + if (dnName.equals(info.getXferAddr())) { + datanodeInfo = info; + } + } + // check the replica status should be healthy(0) + // instead of corruption (1) during decommissioning + if(!checkDecommissionInProgress && datanodeInfo != null + && datanodeInfo.isDecommissionInProgress()) { + String fsckOut = runFsck(conf, 0, true, testFile); + checkDecommissionInProgress = true; + } + } while (datanodeInfo != null && !datanodeInfo.isDecommissioned()); + + // check the replica status should be healthy(0) after decommission + // is done + String fsckOut = runFsck(conf, 0, true, testFile); } /** @@ -1676,179 +1560,165 @@ public class TestFsck { */ @Test public void testFsckListCorruptSnapshotFiles() throws Exception { - Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); DistributedFileSystem hdfs = null; - final short REPL_FACTOR = 1; + final short replFactor = 1; - MiniDFSCluster cluster = null; - try { - int numFiles = 3; - int numSnapshots = 0; - cluster = new MiniDFSCluster.Builder(conf).build(); - cluster.waitActive(); - hdfs = cluster.getFileSystem(); - DFSTestUtil util = new DFSTestUtil.Builder(). - setName("testGetCorruptFiles").setNumFiles(numFiles).setMaxLevels(1). - setMaxSize(1024).build(); + int numFiles = 3; + int numSnapshots = 0; + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + hdfs = cluster.getFileSystem(); + DFSTestUtil util = new DFSTestUtil.Builder(). + setName("testGetCorruptFiles").setNumFiles(numFiles).setMaxLevels(1). + setMaxSize(1024).build(); - util.createFiles(hdfs, "/corruptData", (short) 1); - final Path fp = new Path("/corruptData/file"); - util.createFile(hdfs, fp, 1024, REPL_FACTOR, 1000L); - numFiles++; - util.waitReplication(hdfs, "/corruptData", (short) 1); + util.createFiles(hdfs, "/corruptData", (short) 1); + final Path fp = new Path("/corruptData/file"); + util.createFile(hdfs, fp, 1024, replFactor, 1000L); + numFiles++; + util.waitReplication(hdfs, "/corruptData", (short) 1); - hdfs.allowSnapshot(new Path("/corruptData")); - hdfs.createSnapshot(new Path("/corruptData"), "mySnapShot"); - numSnapshots = numFiles; + hdfs.allowSnapshot(new Path("/corruptData")); + hdfs.createSnapshot(new Path("/corruptData"), "mySnapShot"); + numSnapshots = numFiles; - String outStr = - runFsck(conf, 0, false, "/corruptData", "-list-corruptfileblocks"); - System.out.println("1. good fsck out: " + outStr); - assertTrue(outStr.contains("has 0 CORRUPT files")); - // delete the blocks - final String bpid = cluster.getNamesystem().getBlockPoolId(); - for (int i=0; i metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles( - data_dir); - if (metadataFiles == null) - continue; - for (File metadataFile : metadataFiles) { - File blockFile = Block.metaToBlockFile(metadataFile); - assertTrue("Cannot remove file.", blockFile.delete()); - assertTrue("Cannot remove file.", metadataFile.delete()); - } + String outStr = + runFsck(conf, 0, false, "/corruptData", "-list-corruptfileblocks"); + System.out.println("1. good fsck out: " + outStr); + assertTrue(outStr.contains("has 0 CORRUPT files")); + // delete the blocks + final String bpid = cluster.getNamesystem().getBlockPoolId(); + for (int i=0; i metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles( + dataDir); + if (metadataFiles == null) { + continue; + } + for (File metadataFile : metadataFiles) { + File blockFile = Block.metaToBlockFile(metadataFile); + assertTrue("Cannot remove file.", blockFile.delete()); + assertTrue("Cannot remove file.", metadataFile.delete()); } } - // Delete file when it has a snapshot - hdfs.delete(fp, false); - numFiles--; - - // wait for the namenode to see the corruption - final NamenodeProtocols namenode = cluster.getNameNodeRpc(); - CorruptFileBlocks corruptFileBlocks = namenode - .listCorruptFileBlocks("/corruptData", null); - int numCorrupt = corruptFileBlocks.getFiles().length; - while (numCorrupt == 0) { - Thread.sleep(1000); - corruptFileBlocks = namenode - .listCorruptFileBlocks("/corruptData", null); - numCorrupt = corruptFileBlocks.getFiles().length; - } - - // with -includeSnapshots all files are reported - outStr = runFsck(conf, -1, true, "/corruptData", - "-list-corruptfileblocks", "-includeSnapshots"); - System.out.println("2. bad fsck include snapshot out: " + outStr); - assertTrue(outStr - .contains("has " + (numFiles + numSnapshots) + " CORRUPT files")); - assertTrue(outStr.contains("/.snapshot/")); - - // without -includeSnapshots only non-snapshots are reported - outStr = - runFsck(conf, -1, true, "/corruptData", "-list-corruptfileblocks"); - System.out.println("3. bad fsck exclude snapshot out: " + outStr); - assertTrue(outStr.contains("has " + numFiles + " CORRUPT files")); - assertFalse(outStr.contains("/.snapshot/")); - } finally { - if (cluster != null) {cluster.shutdown();} } + // Delete file when it has a snapshot + hdfs.delete(fp, false); + numFiles--; + + // wait for the namenode to see the corruption + final NamenodeProtocols namenode = cluster.getNameNodeRpc(); + CorruptFileBlocks corruptFileBlocks = namenode + .listCorruptFileBlocks("/corruptData", null); + int numCorrupt = corruptFileBlocks.getFiles().length; + while (numCorrupt == 0) { + Thread.sleep(1000); + corruptFileBlocks = namenode + .listCorruptFileBlocks("/corruptData", null); + numCorrupt = corruptFileBlocks.getFiles().length; + } + + // with -includeSnapshots all files are reported + outStr = runFsck(conf, -1, true, "/corruptData", + "-list-corruptfileblocks", "-includeSnapshots"); + System.out.println("2. bad fsck include snapshot out: " + outStr); + assertTrue(outStr + .contains("has " + (numFiles + numSnapshots) + " CORRUPT files")); + assertTrue(outStr.contains("/.snapshot/")); + + // without -includeSnapshots only non-snapshots are reported + outStr = + runFsck(conf, -1, true, "/corruptData", "-list-corruptfileblocks"); + System.out.println("3. bad fsck exclude snapshot out: " + outStr); + assertTrue(outStr.contains("has " + numFiles + " CORRUPT files")); + assertFalse(outStr.contains("/.snapshot/")); } @Test (timeout = 300000) public void testFsckMoveAfterCorruption() throws Exception { - final int DFS_BLOCK_SIZE = 512 * 1024; - final int NUM_DATANODES = 1; - final int REPLICATION = 1; - MiniDFSCluster cluster = null; - try { - final Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); - conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION); - cluster = new MiniDFSCluster.Builder(conf).build(); - DistributedFileSystem dfs = cluster.getFileSystem(); - cluster.waitActive(); + final int dfsBlockSize = 512 * 1024; + final int numDatanodes = 1; + final int replication = 1; + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, dfsBlockSize); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replication); + cluster = new MiniDFSCluster.Builder(conf).build(); + DistributedFileSystem dfs = cluster.getFileSystem(); + cluster.waitActive(); - final String srcDir = "/srcdat"; - final DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck") - .setMinSize(DFS_BLOCK_SIZE * 2).setMaxSize(DFS_BLOCK_SIZE * 3) - .setNumFiles(1).build(); - util.createFiles(dfs, srcDir, (short) REPLICATION); - final String fileNames[] = util.getFileNames(srcDir); - FSImage.LOG.info("Created files: " + Arrays.toString(fileNames)); + final String srcDir = "/srcdat"; + final DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck") + .setMinSize(dfsBlockSize * 2).setMaxSize(dfsBlockSize * 3) + .setNumFiles(1).build(); + util.createFiles(dfs, srcDir, (short) replication); + final String[] fileNames = util.getFileNames(srcDir); + LOG.info("Created files: " + Arrays.toString(fileNames)); - // Run fsck here. The output is automatically logged for easier debugging - String outStr = runFsck(conf, 0, true, "/", "-files", "-blocks"); - assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + // Run fsck here. The output is automatically logged for easier debugging + String outStr = runFsck(conf, 0, true, "/", "-files", "-blocks"); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); - // Corrupt the first block - final DFSClient dfsClient = new DFSClient( - new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); - final String blockFileToCorrupt = fileNames[0]; - final CorruptedTestFile ctf = new CorruptedTestFile(blockFileToCorrupt, - Sets.newHashSet(0), dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE); - ctf.corruptBlocks(cluster); + // Corrupt the first block + final DFSClient dfsClient = new DFSClient( + new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); + final String blockFileToCorrupt = fileNames[0]; + final CorruptedTestFile ctf = new CorruptedTestFile(blockFileToCorrupt, + Sets.newHashSet(0), dfsClient, numDatanodes, dfsBlockSize); + ctf.corruptBlocks(cluster); - // Wait for fsck to discover all the missing blocks - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - final String str = runFsck(conf, 1, false, "/"); - String numCorrupt = null; - for (String line : str.split(LINE_SEPARATOR)) { - Matcher m = numCorruptBlocksPattern.matcher(line); - if (m.matches()) { - numCorrupt = m.group(1); - break; - } + // Wait for fsck to discover all the missing blocks + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + final String str = runFsck(conf, 1, false, "/"); + String numCorrupt = null; + for (String line : str.split(LINE_SEPARATOR)) { + Matcher m = NUM_CORRUPT_BLOCKS_PATTERN.matcher(line); + if (m.matches()) { + numCorrupt = m.group(1); + break; } - if (numCorrupt == null) { - Assert.fail("Cannot find corrupt blocks count in fsck output."); - } - if (Integer.parseInt(numCorrupt) == ctf.getTotalMissingBlocks()) { - assertTrue(str.contains(NamenodeFsck.CORRUPT_STATUS)); - return true; - } - } catch (Exception e) { - FSImage.LOG.error("Exception caught", e); - Assert.fail("Caught unexpected exception."); } - return false; + if (numCorrupt == null) { + Assert.fail("Cannot find corrupt blocks count in fsck output."); + } + if (Integer.parseInt(numCorrupt) == ctf.getTotalMissingBlocks()) { + assertTrue(str.contains(NamenodeFsck.CORRUPT_STATUS)); + return true; + } + } catch (Exception e) { + LOG.error("Exception caught", e); + Assert.fail("Caught unexpected exception."); } - }, 1000, 60000); - - runFsck(conf, 1, true, "/", "-files", "-blocks", "-racks"); - FSImage.LOG.info("Moving blocks to lost+found"); - // Fsck will return error since we corrupted a block - runFsck(conf, 1, false, "/", "-move"); - - final List retVal = new ArrayList<>(); - final RemoteIterator iter = - dfs.listFiles(new Path("/lost+found"), true); - while (iter.hasNext()) { - retVal.add(iter.next()); + return false; } - FSImage.LOG.info("Items in lost+found: " + retVal); + }, 1000, 60000); - // Expect all good blocks moved, only corrupted block skipped. - long totalLength = 0; - for (LocatedFileStatus lfs: retVal) { - totalLength += lfs.getLen(); - } - Assert.assertTrue("Nothing is moved to lost+found!", totalLength > 0); - util.cleanup(dfs, srcDir); - } finally { - if (cluster != null) { - cluster.shutdown(); - } + runFsck(conf, 1, true, "/", "-files", "-blocks", "-racks"); + LOG.info("Moving blocks to lost+found"); + // Fsck will return error since we corrupted a block + runFsck(conf, 1, false, "/", "-move"); + + final List retVal = new ArrayList<>(); + final RemoteIterator iter = + dfs.listFiles(new Path("/lost+found"), true); + while (iter.hasNext()) { + retVal.add(iter.next()); } - } + LOG.info("Items in lost+found: " + retVal); + // Expect all good blocks moved, only corrupted block skipped. + long totalLength = 0; + for (LocatedFileStatus lfs: retVal) { + totalLength += lfs.getLen(); + } + Assert.assertTrue("Nothing is moved to lost+found!", totalLength > 0); + util.cleanup(dfs, srcDir); + } }