HDFS-4912. Merge 1502634 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1502640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-12 17:57:46 +00:00
parent 71a4677f6a
commit c7bf3e4ad8
4 changed files with 94 additions and 78 deletions

View File

@ -215,6 +215,8 @@ Release 2.1.0-beta - 2013-07-02
HDFS-4645. Move from randomly generated block ID to sequentially generated HDFS-4645. Move from randomly generated block ID to sequentially generated
block ID. (Arpit Agarwal via szetszwo) block ID. (Arpit Agarwal via szetszwo)
HDFS-4912. Cleanup FSNamesystem#startFileInternal. (suresh)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm) HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)

View File

@ -1876,6 +1876,7 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
if (!DFSUtil.isValidName(src)) { if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src); throw new InvalidPathException(src);
} }
blockManager.verifyReplication(src, replication, clientMachine);
boolean skipSync = false; boolean skipSync = false;
final HdfsFileStatus stat; final HdfsFileStatus stat;
@ -1887,6 +1888,8 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
+ "): " + blockSize + " < " + minBlockSize); + "): " + blockSize + " < " + minBlockSize);
} }
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
@ -1894,8 +1897,8 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
throw new SafeModeException("Cannot create file" + src, safeMode); throw new SafeModeException("Cannot create file" + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir); src = FSDirectory.resolvePath(src, pathComponents, dir);
startFileInternal(pc, src, permissions, holder, clientMachine, flag, startFileInternal(pc, src, permissions, holder, clientMachine,
createParent, replication, blockSize); create, overwrite, createParent, replication, blockSize);
stat = dir.getFileInfo(src, false); stat = dir.getFileInfo(src, false);
} catch (StandbyException se) { } catch (StandbyException se) {
skipSync = true; skipSync = true;
@ -1913,25 +1916,18 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
} }
/** /**
* Create new or open an existing file for append.<p> * Create a new file or overwrite an existing file<br>
*
* In case of opening the file for append, the method returns the last
* block of the file if this is a partial block, which can still be used
* for writing more data. The client uses the returned block locations
* to form the data pipeline for this block.<br>
* The method returns null if the last block is full or if this is a
* new file. The client then allocates a new block with the next call
* using {@link NameNode#addBlock()}.<p>
* *
* Once the file is create the client then allocates a new block with the next
* call using {@link NameNode#addBlock()}.
* <p>
* For description of parameters and exceptions thrown see * For description of parameters and exceptions thrown see
* {@link ClientProtocol#create()} * {@link ClientProtocol#create()}
*
* @return the last block locations if the block is partial or null otherwise
*/ */
private LocatedBlock startFileInternal(FSPermissionChecker pc, String src, private void startFileInternal(FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine, PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication, boolean create, boolean overwrite, boolean createParent,
long blockSize) throws SafeModeException, FileAlreadyExistsException, short replication, long blockSize) throws FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException, AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException { ParentNotDirectoryException, IOException {
assert hasWriteLock(); assert hasWriteLock();
@ -1943,11 +1939,8 @@ private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
+ "; already exists as a directory."); + "; already exists as a directory.");
} }
final INodeFile myFile = INodeFile.valueOf(inode, src, true); final INodeFile myFile = INodeFile.valueOf(inode, src, true);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean append = flag.contains(CreateFlag.APPEND);
if (isPermissionEnabled) { if (isPermissionEnabled) {
if (append || (overwrite && myFile != null)) { if (overwrite && myFile != null) {
checkPathAccess(pc, src, FsAction.WRITE); checkPathAccess(pc, src, FsAction.WRITE);
} else { } else {
checkAncestorAccess(pc, src, FsAction.WRITE); checkAncestorAccess(pc, src, FsAction.WRITE);
@ -1959,65 +1952,95 @@ private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
} }
try { try {
blockManager.verifyReplication(src, replication, clientMachine);
boolean create = flag.contains(CreateFlag.CREATE);
if (myFile == null) { if (myFile == null) {
if (!create) { if (!create) {
throw new FileNotFoundException("failed to overwrite or append to non-existent file " throw new FileNotFoundException("failed to overwrite non-existent file "
+ src + " on client " + clientMachine); + src + " on client " + clientMachine);
} }
} else { } else {
// File exists - must be one of append or overwrite
if (overwrite) { if (overwrite) {
delete(src, true); delete(src, true); // File exists - delete if overwrite
} else { } else {
// Opening an existing file for write - may need to recover lease. // If lease soft limit time is expired, recover the lease
recoverLeaseInternal(myFile, src, holder, clientMachine, false); recoverLeaseInternal(myFile, src, holder, clientMachine, false);
throw new FileAlreadyExistsException("failed to create file " + src
if (!append) { + " on client " + clientMachine + " because the file exists");
throw new FileAlreadyExistsException("failed to create file " + src
+ " on client " + clientMachine
+ " because the file exists");
}
} }
} }
checkFsObjectLimit();
final DatanodeDescriptor clientNode = final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
if (append && myFile != null) { INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
final INodeFile f = INodeFile.valueOf(myFile, src); replication, blockSize, holder, clientMachine, clientNode);
return prepareFileForWrite(src, f, holder, clientMachine, clientNode, if (newNode == null) {
true, iip.getLatestSnapshot()); throw new IOException("DIR* NameSystem.startFile: " +
} else { "Unable to add file to namespace.");
// Now we can add the name to the filesystem. This file has no }
// blocks associated with it. leaseManager.addLease(newNode.getClientName(), src);
//
checkFsObjectLimit();
// increment global generation stamp // record file record in log, record new generation stamp
INodeFileUnderConstruction newNode = dir.addFile(src, permissions, getEditLog().logOpenFile(src, newNode);
replication, blockSize, holder, clientMachine, clientNode); if (NameNode.stateChangeLog.isDebugEnabled()) {
if (newNode == null) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
throw new IOException("DIR* NameSystem.startFile: " + +"add "+src+" to namespace for "+holder);
"Unable to add file to namespace.");
}
leaseManager.addLease(newNode.getClientName(), src);
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
}
} }
} catch (IOException ie) { } catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+ie.getMessage()); +ie.getMessage());
throw ie; throw ie;
} }
return null; }
/**
* Append to an existing file for append.
* <p>
*
* The method returns the last block of the file if this is a partial block,
* which can still be used for writing more data. The client uses the returned
* block locations to form the data pipeline for this block.<br>
* The method returns null if the last block is full. The client then
* allocates a new block with the next call using {@link NameNode#addBlock()}.
* <p>
*
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#append(String, String)}
*
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
String holder, String clientMachine) throws AccessControlException,
UnresolvedLinkException, FileNotFoundException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
final INodesInPath iip = dir.getINodesInPath4Write(src);
final INode inode = iip.getLastINode();
if (inode != null && inode.isDirectory()) {
throw new FileAlreadyExistsException("Cannot append to directory " + src
+ "; already exists as a directory.");
}
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.WRITE);
}
try {
if (inode == null) {
throw new FileNotFoundException("failed to append to non-existent file "
+ src + " on client " + clientMachine);
}
final INodeFile myFile = INodeFile.valueOf(inode, src, true);
// Opening an existing file for write - may need to recover lease.
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
true, iip.getLatestSnapshot());
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
throw ie;
}
} }
/** /**
@ -2208,14 +2231,6 @@ private LocatedBlock appendFileInt(String src, String holder, String clientMachi
"Append is not enabled on this NameNode. Use the " + "Append is not enabled on this NameNode. Use the " +
DFS_SUPPORT_APPEND_KEY + " configuration option to enable it."); DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
} }
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine);
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
LocatedBlock lb = null; LocatedBlock lb = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
@ -2228,9 +2243,7 @@ private LocatedBlock appendFileInt(String src, String holder, String clientMachi
throw new SafeModeException("Cannot append to file" + src, safeMode); throw new SafeModeException("Cannot append to file" + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir); src = FSDirectory.resolvePath(src, pathComponents, dir);
lb = startFileInternal(pc, src, null, holder, clientMachine, lb = appendFileInternal(pc, src, holder, clientMachine);
EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, 0);
} catch (StandbyException se) { } catch (StandbyException se) {
skipSync = true; skipSync = true;
throw se; throw se;

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -502,7 +503,7 @@ public void testFileCreationError2() throws IOException {
DistributedFileSystem dfs = null; DistributedFileSystem dfs = null;
try { try {
cluster.waitActive(); cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem(); dfs = cluster.getFileSystem();
DFSClient client = dfs.dfs; DFSClient client = dfs.dfs;
// create a new file. // create a new file.
@ -562,7 +563,7 @@ public void testFileCreationError3() throws IOException {
DistributedFileSystem dfs = null; DistributedFileSystem dfs = null;
try { try {
cluster.waitActive(); cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem(); dfs = cluster.getFileSystem();
DFSClient client = dfs.dfs; DFSClient client = dfs.dfs;
// create a new file. // create a new file.
@ -703,7 +704,7 @@ public void testFileCreationNamenodeRestart() throws IOException {
stm4.close(); stm4.close();
// verify that new block is associated with this file // verify that new block is associated with this file
DFSClient client = ((DistributedFileSystem)fs).dfs; DFSClient client = fs.dfs;
LocatedBlocks locations = client.getNamenode().getBlockLocations( LocatedBlocks locations = client.getNamenode().getBlockLocations(
file1.toString(), 0, Long.MAX_VALUE); file1.toString(), 0, Long.MAX_VALUE);
System.out.println("locations = " + locations.locatedBlockCount()); System.out.println("locations = " + locations.locatedBlockCount());
@ -951,7 +952,7 @@ public void testLeaseExpireHardLimit() throws Exception {
DistributedFileSystem dfs = null; DistributedFileSystem dfs = null;
try { try {
cluster.waitActive(); cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem(); dfs = cluster.getFileSystem();
// create a new file. // create a new file.
final String f = DIR + "foo"; final String f = DIR + "foo";
@ -1012,7 +1013,7 @@ public void testFsClose() throws Exception {
DistributedFileSystem dfs = null; DistributedFileSystem dfs = null;
try { try {
cluster.waitActive(); cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem(); dfs = cluster.getFileSystem();
// create a new file. // create a new file.
final String f = DIR + "foofs"; final String f = DIR + "foofs";
@ -1044,7 +1045,7 @@ public void testFsCloseAfterClusterShutdown() throws IOException {
DistributedFileSystem dfs = null; DistributedFileSystem dfs = null;
try { try {
cluster.waitActive(); cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem(); dfs = cluster.getFileSystem();
// create a new file. // create a new file.
final String f = DIR + "testFsCloseAfterClusterShutdown"; final String f = DIR + "testFsCloseAfterClusterShutdown";
@ -1177,7 +1178,7 @@ public void testFileIdMismatch() throws IOException {
DistributedFileSystem dfs = null; DistributedFileSystem dfs = null;
try { try {
cluster.waitActive(); cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem(); dfs = cluster.getFileSystem();
DFSClient client = dfs.dfs; DFSClient client = dfs.dfs;
final Path f = new Path("/testFileIdMismatch.txt"); final Path f = new Path("/testFileIdMismatch.txt");

View File

@ -130,7 +130,7 @@ public void testImmediateRecoveryOfLease() throws Exception {
size = AppendTestUtil.nextInt(FILE_SIZE); size = AppendTestUtil.nextInt(FILE_SIZE);
filepath = createFile("/immediateRecoverLease-longlease", size, false); filepath = createFile("/immediateRecoverLease-longlease", size, false);
// test recoverLese from a different client // test recoverLease from a different client
recoverLease(filepath, null); recoverLease(filepath, null);
verifyFile(dfs, filepath, actual, size); verifyFile(dfs, filepath, actual, size);