HDFS-4627. Fix FSImageFormat#Loader NPE and synchronization issues. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1460389 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-03-24 14:53:52 +00:00
parent ca7c588185
commit 993a76f2dd
3 changed files with 74 additions and 2 deletions

View File

@ -207,3 +207,6 @@ Branch-2802 Snapshot (Unreleased)
HDFS-4616. Update the FilesDeleted metric while deleting file/dir in the
current tree. (Jing Zhao via szetszwo)
HDFS-4627. Fix FSImageFormat#Loader NPE and synchronization issues.
(Jing Zhao via suresh)

View File

@ -561,7 +561,7 @@ public class FSImageFormat {
// read blocks
BlockInfo[] blocks = null;
if (numBlocks > 0) {
if (numBlocks >= 0) {
blocks = new BlockInfo[numBlocks];
for (int j = 0; j < numBlocks; j++) {
blocks[j] = new BlockInfo(replication);
@ -660,7 +660,7 @@ public class FSImageFormat {
((INodeFileWithSnapshot)oldnode).getDiffs());
}
fsDir.unprotectedReplaceINodeFile(path, oldnode, cons);
fsDir.replaceINodeFile(path, oldnode, cons);
namesystem.leaseManager.addLease(cons.getClientName(), path);
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -26,6 +28,8 @@ import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -33,6 +37,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@ -305,4 +310,68 @@ public class TestFSImageWithSnapshot {
// compare two dumped tree
SnapshotTestHelper.compareDumpedTreeInFile(fsnBefore, fsnAfter);
}
/**
* Test the fsimage loading while there is file under construction.
*/
@Test (timeout=60000)
public void testLoadImageWithAppending() throws Exception {
Path sub1 = new Path(dir, "sub1");
Path sub1file1 = new Path(sub1, "sub1file1");
Path sub1file2 = new Path(sub1, "sub1file2");
DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, REPLICATION, seed);
// 1. create snapshot s0
hdfs.allowSnapshot(dir.toString());
hdfs.createSnapshot(dir, "s0");
// 2. create snapshot s1 before appending sub1file1 finishes
HdfsDataOutputStream out = appendFileWithoutClosing(sub1file1, BLOCKSIZE);
out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
// save namespace and restart cluster
hdfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
hdfs.saveNamespace();
hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(false)
.numDataNodes(REPLICATION).build();
cluster.waitActive();
fsn = cluster.getNamesystem();
hdfs = cluster.getFileSystem();
}
/**
* Test fsimage loading when 1) there is an empty file loaded from fsimage,
* and 2) there is later an append operation to be applied from edit log.
*/
@Test
public void testLoadImageWithEmptyFile() throws Exception {
// create an empty file
Path file = new Path(dir, "file");
FSDataOutputStream out = hdfs.create(file);
out.close();
// save namespace
hdfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
hdfs.saveNamespace();
hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
// append to the empty file
out = hdfs.append(file);
out.write(1);
out.close();
// restart cluster
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).format(false)
.numDataNodes(REPLICATION).build();
cluster.waitActive();
hdfs = cluster.getFileSystem();
FileStatus status = hdfs.getFileStatus(file);
assertEquals(1, status.getLen());
}
}