HDFS-6520. hdfs fsck passes invalid length value when creating BlockReader (Xiao Chen via cmccabe)

(cherry picked from commit 188f65287d)
(cherry picked from commit 387d3f25c1)
This commit is contained in:
Colin Patrick Mccabe 2016-04-06 11:28:34 -07:00
parent e426d1bb3f
commit 8514ab8cb5
5 changed files with 126 additions and 6 deletions

View File

@ -150,7 +150,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
private ClientContext clientContext; private ClientContext clientContext;
/** /**
* Number of bytes to read. -1 indicates no limit. * Number of bytes to read. Must be set to a non-negative value.
*/ */
private long length = -1; private long length = -1;
@ -341,6 +341,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/ */
public BlockReader build() throws IOException { public BlockReader build() throws IOException {
Preconditions.checkNotNull(configuration); Preconditions.checkNotNull(configuration);
Preconditions
.checkState(length >= 0, "Length must be set to a non-negative value");
BlockReader reader = tryToCreateExternalBlockReader(); BlockReader reader = tryToCreateExternalBlockReader();
if (reader != null) { if (reader != null) {
return reader; return reader;

View File

@ -848,7 +848,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
setBlock(block). setBlock(block).
setBlockToken(lblock.getBlockToken()). setBlockToken(lblock.getBlockToken()).
setStartOffset(0). setStartOffset(0).
setLength(-1). setLength(block.getNumBytes()).
setVerifyChecksum(true). setVerifyChecksum(true).
setClientName("fsck"). setClientName("fsck").
setDatanodeInfo(chosenNode). setDatanodeInfo(chosenNode).

View File

@ -155,7 +155,7 @@ public class TestBlockTokenWithDFS {
setBlockToken(lblock.getBlockToken()). setBlockToken(lblock.getBlockToken()).
setInetSocketAddress(targetAddr). setInetSocketAddress(targetAddr).
setStartOffset(0). setStartOffset(0).
setLength(-1). setLength(0).
setVerifyChecksum(true). setVerifyChecksum(true).
setClientName("TestBlockTokenWithDFS"). setClientName("TestBlockTokenWithDFS").
setDatanodeInfo(nodes[0]). setDatanodeInfo(nodes[0]).

View File

@ -521,7 +521,7 @@ public class TestDataNodeVolumeFailure {
"test-blockpoolid", block.getBlockId())). "test-blockpoolid", block.getBlockId())).
setBlockToken(lblock.getBlockToken()). setBlockToken(lblock.getBlockToken()).
setStartOffset(0). setStartOffset(0).
setLength(-1). setLength(0).
setVerifyChecksum(true). setVerifyChecksum(true).
setClientName("TestDataNodeVolumeFailure"). setClientName("TestDataNodeVolumeFailure").
setDatanodeInfo(datanode). setDatanodeInfo(datanode).

View File

@ -35,6 +35,7 @@ import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
@ -46,6 +47,8 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -54,12 +57,15 @@ import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.google.common.base.Supplier;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
@ -98,6 +104,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout; import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender; import org.apache.log4j.RollingFileAppender;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -141,7 +148,7 @@ public class TestFsck {
assertEquals(expectedErrCode, errCode); assertEquals(expectedErrCode, errCode);
} }
GenericTestUtils.setLogLevel(FSPermissionChecker.LOG, Level.INFO); GenericTestUtils.setLogLevel(FSPermissionChecker.LOG, Level.INFO);
FSImage.LOG.error("OUTPUT = " + bStream.toString()); FSImage.LOG.info("OUTPUT = " + bStream.toString());
return bStream.toString(); return bStream.toString();
} }
@ -462,6 +469,24 @@ public class TestFsck {
} }
} }
public void corruptBlocks(MiniDFSCluster cluster) throws IOException {
for (int corruptIdx : blocksToCorrupt) {
// Corrupt a block by deleting it
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(name,
blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock();
for (int i = 0; i < numDataNodes; i++) {
File blockFile = cluster.getBlockFile(i, block);
if(blockFile != null && blockFile.exists()) {
FileOutputStream blockFileStream =
new FileOutputStream(blockFile, false);
blockFileStream.write("corrupt".getBytes());
blockFileStream.close();
FSImage.LOG.info("Corrupted block file " + blockFile);
}
}
}
}
public void checkSalvagedRemains() throws IOException { public void checkSalvagedRemains() throws IOException {
int chainIdx = 0; int chainIdx = 0;
HdfsFileStatus status = dfsClient.getFileInfo(name); HdfsFileStatus status = dfsClient.getFileInfo(name);
@ -1734,4 +1759,97 @@ public class TestFsck {
if (cluster != null) {cluster.shutdown();} if (cluster != null) {cluster.shutdown();}
} }
} }
@Test (timeout = 300000)
public void testFsckMoveAfterCorruption() throws Exception {
final int DFS_BLOCK_SIZE = 512 * 1024;
final int NUM_DATANODES = 1;
final int REPLICATION = 1;
MiniDFSCluster cluster = null;
try {
final Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
cluster = new MiniDFSCluster.Builder(conf).build();
DistributedFileSystem dfs = cluster.getFileSystem();
cluster.waitActive();
final String srcDir = "/srcdat";
final DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck")
.setMinSize(DFS_BLOCK_SIZE * 2).setMaxSize(DFS_BLOCK_SIZE * 3)
.setNumFiles(1).build();
util.createFiles(dfs, srcDir, (short) REPLICATION);
final String fileNames[] = util.getFileNames(srcDir);
FSImage.LOG.info("Created files: " + Arrays.toString(fileNames));
// Run fsck here. The output is automatically logged for easier debugging
String outStr = runFsck(conf, 0, true, "/", "-files", "-blocks");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// Corrupt the first block
final DFSClient dfsClient = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
final String blockFileToCorrupt = fileNames[0];
final CorruptedTestFile ctf = new CorruptedTestFile(blockFileToCorrupt,
Sets.newHashSet(0), dfsClient, NUM_DATANODES, DFS_BLOCK_SIZE);
ctf.corruptBlocks(cluster);
// Wait for fsck to discover all the missing blocks
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
final String str = runFsck(conf, 1, false, "/");
String numCorrupt = null;
for (String line : str.split(LINE_SEPARATOR)) {
Matcher m = numCorruptBlocksPattern.matcher(line);
if (m.matches()) {
numCorrupt = m.group(1);
break;
}
}
if (numCorrupt == null) {
Assert.fail("Cannot find corrupt blocks count in fsck output.");
}
if (Integer.parseInt(numCorrupt) == ctf.getTotalMissingBlocks()) {
assertTrue(str.contains(NamenodeFsck.CORRUPT_STATUS));
return true;
}
} catch (Exception e) {
FSImage.LOG.error("Exception caught", e);
Assert.fail("Caught unexpected exception.");
}
return false;
}
}, 1000, 60000);
runFsck(conf, 1, true, "/", "-files", "-blocks", "-racks");
FSImage.LOG.info("Moving blocks to lost+found");
// Fsck will return error since we corrupted a block
runFsck(conf, 1, false, "/", "-move");
final List<LocatedFileStatus> retVal = new ArrayList<>();
final RemoteIterator<LocatedFileStatus> iter =
dfs.listFiles(new Path("/lost+found"), true);
while (iter.hasNext()) {
retVal.add(iter.next());
}
FSImage.LOG.info("Items in lost+found: " + retVal);
// Expect all good blocks moved, only corrupted block skipped.
long totalLength = 0;
for (LocatedFileStatus lfs: retVal) {
totalLength += lfs.getLen();
}
Assert.assertTrue("Nothing is moved to lost+found!", totalLength > 0);
util.cleanup(dfs, srcDir);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
} }