svn merge -c 1561223 merging from trunk to branch-2 to fix:HDFS-5728. Block recovery will fail if the metafile does not have crc for all chunks of the block. Contributed by Vinay.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1561224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2014-01-24 22:57:56 +00:00
parent 4fe4190b1d
commit c32beafe02
3 changed files with 81 additions and 4 deletions

View File

@ -304,6 +304,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5806. Balancer should set SoTimeout to avoid indefinite hangs. HDFS-5806. Balancer should set SoTimeout to avoid indefinite hangs.
(Nathan Roberts via Andrew Wang) (Nathan Roberts via Andrew Wang)
HDFS-5728. Block recovery will fail if the metafile does not have crc
for all chunks of the block (Vinay via kihwal)
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
HDFS-4985. Add storage type to the protocol and expose it in block report HDFS-4985. Add storage type to the protocol and expose it in block report

View File

@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.RandomAccessFile;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU; import org.apache.hadoop.fs.DU;
@ -191,7 +192,7 @@ class BlockPoolSlice {
blockFile.length(), genStamp, volume, blockFile.getParentFile()); blockFile.length(), genStamp, volume, blockFile.getParentFile());
} else { } else {
newReplica = new ReplicaWaitingToBeRecovered(blockId, newReplica = new ReplicaWaitingToBeRecovered(blockId,
validateIntegrity(blockFile, genStamp), validateIntegrityAndSetLength(blockFile, genStamp),
genStamp, volume, blockFile.getParentFile()); genStamp, volume, blockFile.getParentFile());
} }
@ -214,7 +215,7 @@ class BlockPoolSlice {
* @param genStamp generation stamp of the block * @param genStamp generation stamp of the block
* @return the number of valid bytes * @return the number of valid bytes
*/ */
private long validateIntegrity(File blockFile, long genStamp) { private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
DataInputStream checksumIn = null; DataInputStream checksumIn = null;
InputStream blockIn = null; InputStream blockIn = null;
try { try {
@ -257,11 +258,25 @@ class BlockPoolSlice {
IOUtils.readFully(blockIn, buf, 0, lastChunkSize); IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
checksum.update(buf, 0, lastChunkSize); checksum.update(buf, 0, lastChunkSize);
long validFileLength;
if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
return lastChunkStartPos + lastChunkSize; validFileLength = lastChunkStartPos + lastChunkSize;
} else { // last chunck is corrupt } else { // last chunck is corrupt
return lastChunkStartPos; validFileLength = lastChunkStartPos;
} }
// truncate if extra bytes are present without CRC
if (blockFile.length() > validFileLength) {
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
try {
// truncate blockFile
blockRAF.setLength(validFileLength);
} finally {
blockRAF.close();
}
}
return validFileLength;
} catch (IOException e) { } catch (IOException e) {
FsDatasetImpl.LOG.warn(e); FsDatasetImpl.LOG.warn(e);
return 0; return 0;

View File

@ -19,20 +19,28 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test; import org.junit.Test;
public class TestLeaseRecovery { public class TestLeaseRecovery {
@ -148,4 +156,55 @@ public class TestLeaseRecovery {
if (cluster != null) {cluster.shutdown();} if (cluster != null) {cluster.shutdown();}
} }
} }
/**
* Block Recovery when the meta file not having crcs for all chunks in block
* file
*/
@Test
public void testBlockRecoveryWithLessMetafile() throws Exception {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
Path file = new Path("/testRecoveryFile");
DistributedFileSystem dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(file);
int count = 0;
while (count < 2 * 1024 * 1024) {
out.writeBytes("Data");
count += 4;
}
out.hsync();
// abort the original stream
((DFSOutputStream) out.getWrappedStream()).abort();
LocatedBlocks locations = cluster.getNameNodeRpc().getBlockLocations(
file.toString(), 0, count);
ExtendedBlock block = locations.get(0).getBlock();
DataNode dn = cluster.getDataNodes().get(0);
BlockLocalPathInfo localPathInfo = dn.getBlockLocalPathInfo(block, null);
File metafile = new File(localPathInfo.getMetaPath());
assertTrue(metafile.exists());
// reduce the block meta file size
RandomAccessFile raf = new RandomAccessFile(metafile, "rw");
raf.setLength(metafile.length() - 20);
raf.close();
// restart DN to make replica to RWR
DataNodeProperties dnProp = cluster.stopDataNode(0);
cluster.restartDataNode(dnProp, true);
// try to recover the lease
DistributedFileSystem newdfs = (DistributedFileSystem) FileSystem
.newInstance(cluster.getConfiguration(0));
count = 0;
while (++count < 10 && !newdfs.recoverLease(file)) {
Thread.sleep(1000);
}
assertTrue("File should be closed", newdfs.recoverLease(file));
}
} }