HDFS-6294. Use INode IDs to avoid conflicts when a file open for write is renamed (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1593638 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-05-09 22:55:17 +00:00
parent 29a124d09d
commit fea7cd3c12
25 changed files with 326 additions and 157 deletions

View File

@ -89,6 +89,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6295. Add "decommissioning" state and node state filtering to
dfsadmin. (wang)
HDFS-6294. Use INode IDs to avoid conflicts when a file open for write is
renamed. (cmccabe)
OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

View File

@ -504,8 +504,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
* that are currently being written by this client.
* Note that a file can only be written by a single client.
*/
private final Map<String, DFSOutputStream> filesBeingWritten
= new HashMap<String, DFSOutputStream>();
private final Map<Long, DFSOutputStream> filesBeingWritten
= new HashMap<Long, DFSOutputStream>();
/**
* Same as this(NameNode.getAddress(conf), conf);
@ -734,14 +734,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
}
/** Get a lease and start automatic renewal */
private void beginFileLease(final String src, final DFSOutputStream out)
private void beginFileLease(final long inodeId, final DFSOutputStream out)
throws IOException {
getLeaseRenewer().put(src, out, this);
getLeaseRenewer().put(inodeId, out, this);
}
/** Stop renewal of lease for the file. */
void endFileLease(final String src) throws IOException {
getLeaseRenewer().closeFile(src, this);
void endFileLease(final long inodeId) throws IOException {
getLeaseRenewer().closeFile(inodeId, this);
}
@ -749,9 +749,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
* enforced to consistently update its local dfsclients array and
* client's filesBeingWritten map.
*/
void putFileBeingWritten(final String src, final DFSOutputStream out) {
void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
synchronized(filesBeingWritten) {
filesBeingWritten.put(src, out);
filesBeingWritten.put(inodeId, out);
// update the last lease renewal time only when there was no
// writes. once there is one write stream open, the lease renewer
// thread keeps it updated well with in anyone's expiration time.
@ -762,9 +762,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
}
/** Remove a file. Only called from LeaseRenewer. */
void removeFileBeingWritten(final String src) {
void removeFileBeingWritten(final long inodeId) {
synchronized(filesBeingWritten) {
filesBeingWritten.remove(src);
filesBeingWritten.remove(inodeId);
if (filesBeingWritten.isEmpty()) {
lastLeaseRenewal = 0;
}
@ -849,14 +849,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
/** Close/abort all files being written. */
private void closeAllFilesBeingWritten(final boolean abort) {
for(;;) {
final String src;
final long inodeId;
final DFSOutputStream out;
synchronized(filesBeingWritten) {
if (filesBeingWritten.isEmpty()) {
return;
}
src = filesBeingWritten.keySet().iterator().next();
out = filesBeingWritten.remove(src);
inodeId = filesBeingWritten.keySet().iterator().next();
out = filesBeingWritten.remove(inodeId);
}
if (out != null) {
try {
@ -866,8 +866,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
out.close();
}
} catch(IOException ie) {
LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
ie);
LOG.error("Failed to " + (abort? "abort": "close") +
" inode " + inodeId, ie);
}
}
}
@ -1481,7 +1481,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
beginFileLease(src, result);
beginFileLease(result.getFileId(), result);
return result;
}
@ -1529,7 +1529,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
flag, createParent, replication, blockSize, progress, buffersize,
checksum);
}
beginFileLease(src, result);
beginFileLease(result.getFileId(), result);
return result;
}
@ -1617,7 +1617,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
+ src + " on client " + clientName);
}
final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
beginFileLease(src, result);
beginFileLease(result.getFileId(), result);
return result;
}
@ -2454,8 +2454,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
}
@VisibleForTesting
ExtendedBlock getPreviousBlock(String file) {
return filesBeingWritten.get(file).getBlock();
ExtendedBlock getPreviousBlock(long fileId) {
return filesBeingWritten.get(fileId).getBlock();
}
/**

View File

@ -1017,7 +1017,7 @@ public class DFSOutputStream extends FSOutputSummer
//get a new datanode
final DatanodeInfo[] original = nodes;
final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
src, block, nodes, storageIDs,
src, fileId, block, nodes, storageIDs,
failed.toArray(new DatanodeInfo[failed.size()]),
1, dfsClient.clientName);
setPipeline(lb);
@ -1273,7 +1273,8 @@ public class DFSOutputStream extends FSOutputSummer
if (!success) {
DFSClient.LOG.info("Abandoning " + block);
dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
dfsClient.namenode.abandonBlock(block, fileId, src,
dfsClient.clientName);
block = null;
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
@ -1923,7 +1924,8 @@ public class DFSOutputStream extends FSOutputSummer
// namenode.
if (persistBlocks.getAndSet(false) || updateLength) {
try {
dfsClient.namenode.fsync(src, dfsClient.clientName, lastBlockLength);
dfsClient.namenode.fsync(src, fileId,
dfsClient.clientName, lastBlockLength);
} catch (IOException ioe) {
DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
// If we got an error here, it might be because some other thread called
@ -2044,7 +2046,7 @@ public class DFSOutputStream extends FSOutputSummer
streamer.setLastException(new IOException("Lease timeout of "
+ (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true);
dfsClient.endFileLease(src);
dfsClient.endFileLease(fileId);
}
// shutdown datastreamer and responseprocessor threads.
@ -2098,7 +2100,7 @@ public class DFSOutputStream extends FSOutputSummer
ExtendedBlock lastBlock = streamer.getBlock();
closeThreads(false);
completeFile(lastBlock);
dfsClient.endFileLease(src);
dfsClient.endFileLease(fileId);
} catch (ClosedChannelException e) {
} finally {
closed = true;
@ -2191,7 +2193,7 @@ public class DFSOutputStream extends FSOutputSummer
}
@VisibleForTesting
long getFileId() {
public long getFileId() {
return fileId;
}
}

View File

@ -281,7 +281,7 @@ class LeaseRenewer {
&& Time.now() - emptyTime > gracePeriod;
}
synchronized void put(final String src, final DFSOutputStream out,
synchronized void put(final long inodeId, final DFSOutputStream out,
final DFSClient dfsc) {
if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) {
@ -319,7 +319,7 @@ class LeaseRenewer {
});
daemon.start();
}
dfsc.putFileBeingWritten(src, out);
dfsc.putFileBeingWritten(inodeId, out);
emptyTime = Long.MAX_VALUE;
}
}
@ -330,8 +330,8 @@ class LeaseRenewer {
}
/** Close a file. */
void closeFile(final String src, final DFSClient dfsc) {
dfsc.removeFileBeingWritten(src);
void closeFile(final long inodeId, final DFSClient dfsc) {
dfsc.removeFileBeingWritten(inodeId);
synchronized(this) {
if (dfsc.isFilesBeingWrittenEmpty()) {

View File

@ -290,13 +290,20 @@ public interface ClientProtocol {
* file.
* Any partial writes to the block will be discarded.
*
* @param b Block to abandon
* @param fileId The id of the file where the block resides. Older clients
* will pass GRANDFATHER_INODE_ID here.
* @param src The path of the file where the block resides.
* @param holder Lease holder.
*
* @throws AccessControlException If access is denied
* @throws FileNotFoundException file <code>src</code> is not found
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
@Idempotent
public void abandonBlock(ExtendedBlock b, String src, String holder)
public void abandonBlock(ExtendedBlock b, long fileId,
String src, String holder)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException;
@ -344,6 +351,7 @@ public interface ClientProtocol {
* Get a datanode for an existing pipeline.
*
* @param src the file being written
* @param fileId the ID of the file being written
* @param blk the block being written
* @param existings the existing nodes in the pipeline
* @param excludes the excluded nodes
@ -359,8 +367,10 @@ public interface ClientProtocol {
* @throws IOException If an I/O error occurred
*/
@Idempotent
public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final String[] existingStorageIDs,
public LocatedBlock getAdditionalDatanode(final String src,
final long fileId, final ExtendedBlock blk,
final DatanodeInfo[] existings,
final String[] existingStorageIDs,
final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws AccessControlException, FileNotFoundException,
@ -896,6 +906,8 @@ public interface ClientProtocol {
* Write all metadata for this file into persistent storage.
* The file must be currently open for writing.
* @param src The string representation of the path
* @param inodeId The inode ID, or GRANDFATHER_INODE_ID if the client is
* too old to support fsync with inode IDs.
* @param client The string representation of the client
* @param lastBlockLength The length of the last block (under construction)
* to be reported to NameNode
@ -905,7 +917,8 @@ public interface ClientProtocol {
* @throws IOException If an I/O error occurred
*/
@Idempotent
public void fsync(String src, String client, long lastBlockLength)
public void fsync(String src, long inodeId, String client,
long lastBlockLength)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException;

View File

@ -422,8 +422,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public AbandonBlockResponseProto abandonBlock(RpcController controller,
AbandonBlockRequestProto req) throws ServiceException {
try {
server.abandonBlock(PBHelper.convert(req.getB()), req.getSrc(),
req.getHolder());
server.abandonBlock(PBHelper.convert(req.getB()), req.getFileId(),
req.getSrc(), req.getHolder());
} catch (IOException e) {
throw new ServiceException(e);
}
@ -461,7 +461,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
List<String> existingStorageIDsList = req.getExistingStorageUuidsList();
List<DatanodeInfoProto> excludesList = req.getExcludesList();
LocatedBlock result = server.getAdditionalDatanode(req.getSrc(),
PBHelper.convert(req.getBlk()),
req.getFileId(), PBHelper.convert(req.getBlk()),
PBHelper.convert(existingList.toArray(
new DatanodeInfoProto[existingList.size()])),
existingStorageIDsList.toArray(
@ -819,7 +819,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public FsyncResponseProto fsync(RpcController controller,
FsyncRequestProto req) throws ServiceException {
try {
server.fsync(req.getSrc(), req.getClient(), req.getLastBlockLength());
server.fsync(req.getSrc(), req.getFileId(),
req.getClient(), req.getLastBlockLength());
return VOID_FSYNC_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);

View File

@ -329,11 +329,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public void abandonBlock(ExtendedBlock b, String src, String holder)
throws AccessControlException, FileNotFoundException,
public void abandonBlock(ExtendedBlock b, long fileId, String src,
String holder) throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder()
.setB(PBHelper.convert(b)).setSrc(src).setHolder(holder).build();
.setB(PBHelper.convert(b)).setSrc(src).setHolder(holder)
.setFileId(fileId).build();
try {
rpcProxy.abandonBlock(null, req);
} catch (ServiceException e) {
@ -365,8 +366,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public LocatedBlock getAdditionalDatanode(String src, ExtendedBlock blk,
DatanodeInfo[] existings, String[] existingStorageIDs,
public LocatedBlock getAdditionalDatanode(String src, long fileId,
ExtendedBlock blk, DatanodeInfo[] existings, String[] existingStorageIDs,
DatanodeInfo[] excludes,
int numAdditionalNodes, String clientName) throws AccessControlException,
FileNotFoundException, SafeModeException, UnresolvedLinkException,
@ -374,6 +375,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetAdditionalDatanodeRequestProto req = GetAdditionalDatanodeRequestProto
.newBuilder()
.setSrc(src)
.setFileId(fileId)
.setBlk(PBHelper.convert(blk))
.addAllExistings(PBHelper.convert(existings))
.addAllExistingStorageUuids(Arrays.asList(existingStorageIDs))
@ -750,11 +752,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public void fsync(String src, String client, long lastBlockLength)
public void fsync(String src, long fileId, String client,
long lastBlockLength)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
FsyncRequestProto req = FsyncRequestProto.newBuilder().setSrc(src)
.setClient(client).setLastBlockLength(lastBlockLength).build();
.setClient(client).setLastBlockLength(lastBlockLength)
.setFileId(fileId).build();
try {
rpcProxy.fsync(null, req);
} catch (ServiceException e) {

View File

@ -1149,7 +1149,7 @@ public class FSEditLog implements LogsPurgeable {
* Finalize the current log segment.
* Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
*/
synchronized void endCurrentLogSegment(boolean writeEndTxn) {
public synchronized void endCurrentLogSegment(boolean writeEndTxn) {
LOG.info("Ending log segment " + curSegmentTxId);
Preconditions.checkState(isSegmentOpen(),
"Bad state: %s", state);

View File

@ -542,7 +542,7 @@ public class FSImage implements Closeable {
}
@VisibleForTesting
void setEditLogForTesting(FSEditLog newLog) {
public void setEditLogForTesting(FSEditLog newLog) {
editLog = newLog;
}

View File

@ -2667,9 +2667,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
final INode[] inodes = analyzeFileState(
src, fileId, clientName, previous, onRetryBlock).getINodes();
final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
final INodeFile pendingFile = analyzeFileState(
src, fileId, clientName, previous, onRetryBlock);
src = pendingFile.getFullPathName();
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations.
@ -2703,10 +2703,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// Run the full analysis again, since things could have changed
// while chooseTarget() was executing.
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
INodesInPath inodesInPath =
final INodeFile pendingFile =
analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
final INode[] inodes = inodesInPath.getINodes();
final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) {
@ -2728,6 +2726,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// allocate new block, record block locations in INode.
newBlock = createNewBlock();
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
saveAllocatedBlock(src, inodesInPath, newBlock, targets);
dir.persistNewBlock(src, pendingFile);
@ -2741,7 +2740,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return makeLocatedBlock(newBlock, targets, offset);
}
INodesInPath analyzeFileState(String src,
INodeFile analyzeFileState(String src,
long fileId,
String clientName,
ExtendedBlock previous,
@ -2758,9 +2757,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkFsObjectLimit();
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
INode inode;
if (fileId == INodeId.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
final INodesInPath iip = dir.getINodesInPath4Write(src);
final INodeFile pendingFile
= checkLease(src, fileId, clientName, iip.getLastINode());
inode = iip.getLastINode();
} else {
// Newer clients pass the inode ID, so we can just get the inode
// directly.
inode = dir.getInode(fileId);
if (inode != null) src = inode.getFullPathName();
}
final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
@ -2818,7 +2828,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
((BlockInfoUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
offset);
return iip;
return pendingFile;
} else {
// Case 3
throw new IOException("Cannot allocate block in " + src + ": " +
@ -2831,7 +2841,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet: " + src);
}
return iip;
return pendingFile;
}
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
@ -2844,8 +2854,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
/** @see ClientProtocol#getAdditionalDatanode */
LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final String[] storageIDs,
LocatedBlock getAdditionalDatanode(String src, long fileId,
final ExtendedBlock blk, final DatanodeInfo[] existings,
final String[] storageIDs,
final Set<Node> excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
@ -2865,7 +2876,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
src = FSDirectory.resolvePath(src, pathComponents, dir);
//check lease
final INodeFile file = checkLease(src, clientName);
final INode inode;
if (fileId == INodeId.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
inode = dir.getINode(src);
} else {
inode = dir.getInode(fileId);
if (inode != null) src = inode.getFullPathName();
}
final INodeFile file = checkLease(src, clientName, inode, fileId);
clientnode = file.getFileUnderConstructionFeature().getClientNode();
preferredblocksize = file.getPreferredBlockSize();
@ -2889,7 +2910,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/**
* The client would like to let go of the given block
*/
boolean abandonBlock(ExtendedBlock b, String src, String holder)
boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
throws LeaseExpiredException, FileNotFoundException,
UnresolvedLinkException, IOException {
if(NameNode.stateChangeLog.isDebugEnabled()) {
@ -2901,13 +2922,24 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot abandon block " + b + " for fle" + src);
checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
src = FSDirectory.resolvePath(src, pathComponents, dir);
final INode inode;
if (fileId == INodeId.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
inode = dir.getINode(src);
} else {
inode = dir.getInode(fileId);
if (inode != null) src = inode.getFullPathName();
}
final INodeFile file = checkLease(src, holder, inode, fileId);
//
// Remove the block from the pending creates list
//
INodeFile file = checkLease(src, holder);
boolean removed = dir.removeBlock(src, file,
ExtendedBlock.getLocalBlock(b));
if (!removed) {
@ -2926,21 +2958,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return true;
}
/** make sure that we still have the lease on this file. */
private INodeFile checkLease(String src, String holder)
throws LeaseExpiredException, UnresolvedLinkException,
FileNotFoundException {
return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
dir.getINode(src));
}
private INodeFile checkLease(String src, long fileId, String holder,
INode inode) throws LeaseExpiredException, FileNotFoundException {
private INodeFile checkLease(String src, String holder, INode inode,
long fileId)
throws LeaseExpiredException, FileNotFoundException {
assert hasReadLock();
if (inode == null || !inode.isFile()) {
final String ident = src + " (inode " + fileId + ")";
if (inode == null) {
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException(
"No lease on " + src + ": File does not exist. "
"No lease on " + ident + ": File does not exist. "
+ (lease != null ? lease.toString()
: "Holder " + holder + " does not have any open files."));
}
if (!inode.isFile()) {
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException(
"No lease on " + ident + ": INode is not a regular file. "
+ (lease != null ? lease.toString()
: "Holder " + holder + " does not have any open files."));
}
@ -2948,16 +2981,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!file.isUnderConstruction()) {
Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException(
"No lease on " + src + ": File is not open for writing. "
"No lease on " + ident + ": File is not open for writing. "
+ (lease != null ? lease.toString()
: "Holder " + holder + " does not have any open files."));
}
String clientName = file.getFileUnderConstructionFeature().getClientName();
if (holder != null && !clientName.equals(holder)) {
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
+ clientName + " but is accessed by " + holder);
throw new LeaseExpiredException("Lease mismatch on " + ident +
" owned by " + clientName + " but is accessed by " + holder);
}
INodeId.checkId(fileId, file);
return file;
}
@ -3000,10 +3032,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
String holder, Block last, long fileId) throws SafeModeException,
UnresolvedLinkException, IOException {
assert hasWriteLock();
final INodesInPath iip = dir.getLastINodeInPath(src);
final INodeFile pendingFile;
try {
pendingFile = checkLease(src, fileId, holder, iip.getINode(0));
final INode inode;
if (fileId == INodeId.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
final INodesInPath iip = dir.getLastINodeInPath(src);
inode = iip.getINode(0);
} else {
inode = dir.getInode(fileId);
if (inode != null) src = inode.getFullPathName();
}
pendingFile = checkLease(src, holder, inode, fileId);
} catch (LeaseExpiredException lee) {
final INode inode = dir.getINode(src);
if (inode != null
@ -3018,9 +3060,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final Block realLastBlock = inode.asFile().getLastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* completeFile: " +
"request from " + holder + " to complete " + src +
" which is already closed. But, it appears to be an RPC " +
"retry. Returning success");
"request from " + holder + " to complete inode " + fileId +
"(" + src + ") which is already closed. But, it appears to be " +
"an RPC retry. Returning success");
return true;
}
}
@ -3040,7 +3082,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId());
Snapshot.CURRENT_STATE_ID);
return true;
}
@ -3681,12 +3723,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Persist all metadata about this file.
* @param src The string representation of the path
* @param fileId The inode ID that we're fsyncing. Older clients will pass
* INodeId.GRANDFATHER_INODE_ID here.
* @param clientName The string representation of the client
* @param lastBlockLength The length of the last block
* under construction reported from client.
* @throws IOException if path does not exist
*/
void fsync(String src, String clientName, long lastBlockLength)
void fsync(String src, long fileId, String clientName, long lastBlockLength)
throws IOException, UnresolvedLinkException {
NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
checkOperation(OperationCategory.WRITE);
@ -3696,7 +3740,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot fsync file " + src);
src = FSDirectory.resolvePath(src, pathComponents, dir);
INodeFile pendingFile = checkLease(src, clientName);
final INode inode;
if (fileId == INodeId.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
inode = dir.getINode(src);
} else {
inode = dir.getInode(fileId);
if (inode != null) src = inode.getFullPathName();
}
final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
if (lastBlockLength > 0) {
pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
pendingFile, lastBlockLength);

View File

@ -46,6 +46,28 @@ public class INodesInPath {
: Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, pathComponent);
}
static INodesInPath fromINode(INode inode) {
int depth = 0, index;
INode tmp = inode;
while (tmp != null) {
depth++;
tmp = tmp.getParent();
}
final byte[][] path = new byte[depth][];
final INode[] inodes = new INode[depth];
final INodesInPath iip = new INodesInPath(path, depth);
tmp = inode;
index = depth;
while (tmp != null) {
index--;
path[index] = tmp.getKey();
inodes[index] = tmp;
tmp = tmp.getParent();
}
iip.setINodes(inodes);
return iip;
}
/**
* Given some components, create a path name.
* @param components The path components
@ -342,6 +364,11 @@ public class INodesInPath {
inodes[numNonNull++] = node;
}
private void setINodes(INode inodes[]) {
this.inodes = inodes;
this.numNonNull = this.inodes.length;
}
void setINode(int i, INode inode) {
inodes[i >= 0? i: inodes.length + i] = inode;
}

View File

@ -595,13 +595,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
public LocatedBlock getAdditionalDatanode(final String src,
final long fileId, final ExtendedBlock blk,
final DatanodeInfo[] existings, final String[] existingStorageIDs,
final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("getAdditionalDatanode: src=" + src
+ ", fileId=" + fileId
+ ", blk=" + blk
+ ", existings=" + Arrays.asList(existings)
+ ", excludes=" + Arrays.asList(excludes)
@ -618,20 +620,20 @@ class NameNodeRpcServer implements NamenodeProtocols {
excludeSet.add(node);
}
}
return namesystem.getAdditionalDatanode(src, blk, existings,
return namesystem.getAdditionalDatanode(src, fileId, blk, existings,
existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
}
/**
* The client needs to give up on the block.
*/
@Override // ClientProtocol
public void abandonBlock(ExtendedBlock b, String src, String holder)
throws IOException {
public void abandonBlock(ExtendedBlock b, long fileId, String src,
String holder) throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+b+" of file "+src);
}
if (!namesystem.abandonBlock(b, src, holder)) {
if (!namesystem.abandonBlock(b, fileId, src, holder)) {
throw new IOException("Cannot abandon block during write to " + src);
}
}
@ -939,9 +941,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
public void fsync(String src, String clientName, long lastBlockLength)
public void fsync(String src, long fileId, String clientName,
long lastBlockLength)
throws IOException {
namesystem.fsync(src, clientName, lastBlockLength);
namesystem.fsync(src, fileId, clientName, lastBlockLength);
}
@Override // ClientProtocol

View File

@ -117,6 +117,7 @@ message AbandonBlockRequestProto {
required ExtendedBlockProto b = 1;
required string src = 2;
required string holder = 3;
optional uint64 fileId = 4 [default = 0]; // default to GRANDFATHER_INODE_ID
}
message AbandonBlockResponseProto { // void response
@ -143,6 +144,7 @@ message GetAdditionalDatanodeRequestProto {
required uint32 numAdditionalNodes = 5;
required string clientName = 6;
repeated string existingStorageUuids = 7;
optional uint64 fileId = 8 [default = 0]; // default to GRANDFATHER_INODE_ID
}
message GetAdditionalDatanodeResponseProto {
@ -532,6 +534,7 @@ message FsyncRequestProto {
required string src = 1;
required string client = 2;
optional sint64 lastBlockLength = 3 [default = -1];
optional uint64 fileId = 4 [default = 0]; // default to GRANDFATHER_INODE_ID
}
message FsyncResponseProto { // void response

View File

@ -54,8 +54,8 @@ public class DFSClientAdapter {
return dfs.dfs;
}
public static ExtendedBlock getPreviousBlock(DFSClient client, String file) {
return client.getPreviousBlock(file);
public static ExtendedBlock getPreviousBlock(DFSClient client, long fileId) {
return client.getPreviousBlock(fileId);
}
public static long getFileId(DFSOutputStream out) {

View File

@ -71,6 +71,7 @@ public class TestAbandonBlock {
fout.write(123);
}
fout.hflush();
long fileId = ((DFSOutputStream)fout.getWrappedStream()).getFileId();
// Now abandon the last block
DFSClient dfsclient = DFSClientAdapter.getDFSClient(fs);
@ -78,11 +79,11 @@ public class TestAbandonBlock {
dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
int orginalNumBlocks = blocks.locatedBlockCount();
LocatedBlock b = blocks.getLastLocatedBlock();
dfsclient.getNamenode().abandonBlock(b.getBlock(), src,
dfsclient.getNamenode().abandonBlock(b.getBlock(), fileId, src,
dfsclient.clientName);
// call abandonBlock again to make sure the operation is idempotent
dfsclient.getNamenode().abandonBlock(b.getBlock(), src,
dfsclient.getNamenode().abandonBlock(b.getBlock(), fileId, src,
dfsclient.clientName);
// And close the file

View File

@ -253,16 +253,7 @@ public class TestFileAppend3 {
assertTrue(fs.rename(p, pnew));
//d. Close file handle that was opened in (b).
try {
out.close();
fail("close() should throw an exception");
} catch(Exception e) {
AppendTestUtil.LOG.info("GOOD!", e);
}
//wait for the lease recovery
cluster.setLeasePeriod(1000, 1000);
AppendTestUtil.sleep(5000);
//check block sizes
final long len = fs.getFileStatus(pnew).getLen();

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
@ -384,7 +385,6 @@ public class TestFileCreation {
Path p = new Path("/testfile");
FSDataOutputStream stm1 = fs.create(p);
stm1.write(1);
stm1.hflush();
// Create file again without overwrite
try {
@ -403,7 +403,8 @@ public class TestFileCreation {
stm1.close();
fail("Should have exception closing stm1 since it was deleted");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("File is not open for writing", ioe);
GenericTestUtils.assertExceptionContains("No lease on /testfile", ioe);
GenericTestUtils.assertExceptionContains("File does not exist.", ioe);
}
} finally {
@ -1189,8 +1190,8 @@ public class TestFileCreation {
cluster.getNameNodeRpc()
.complete(f.toString(), client.clientName, null, someOtherFileId);
fail();
} catch(FileNotFoundException fnf) {
FileSystem.LOG.info("Caught Expected FileNotFoundException: ", fnf);
} catch(LeaseExpiredException e) {
FileSystem.LOG.info("Caught Expected LeaseExpiredException: ", e);
}
} finally {
IOUtils.closeStream(dfs);

View File

@ -242,6 +242,51 @@ public class TestLease {
Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
Assert.assertEquals(1, leaseCount(cluster));
out.close();
} finally {
cluster.shutdown();
}
}
/**
* Test that we can open up a file for write, move it to another location,
* and then create a new file in the previous location, without causing any
* lease conflicts. This is possible because we now use unique inode IDs
* to identify files to the NameNode.
*/
@Test
public void testLeaseAfterRenameAndRecreate() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
try {
final Path path1 = new Path("/test-file");
final String contents1 = "contents1";
final Path path2 = new Path("/test-file-new-location");
final String contents2 = "contents2";
// open a file to get a lease
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream out1 = fs.create(path1);
out1.writeBytes(contents1);
Assert.assertTrue(hasLease(cluster, path1));
Assert.assertEquals(1, leaseCount(cluster));
DistributedFileSystem fs2 = (DistributedFileSystem)
FileSystem.newInstance(fs.getUri(), fs.getConf());
fs2.rename(path1, path2);
FSDataOutputStream out2 = fs2.create(path1);
out2.writeBytes(contents2);
out2.close();
// The first file should still be open and valid
Assert.assertTrue(hasLease(cluster, path2));
out1.close();
// Contents should be as expected
DistributedFileSystem fs3 = (DistributedFileSystem)
FileSystem.newInstance(fs.getUri(), fs.getConf());
Assert.assertEquals(contents1, DFSTestUtil.readFile(fs3, path2));
Assert.assertEquals(contents2, DFSTestUtil.readFile(fs3, path1));
} finally {
cluster.shutdown();
}

View File

@ -107,8 +107,8 @@ public class TestLeaseRenewer {
// Set up a file so that we start renewing our lease.
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
String filePath = "/foo";
renewer.put(filePath, mockStream, MOCK_DFSCLIENT);
long fileId = 123L;
renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
// Wait for lease to get renewed
long failTime = Time.now() + 5000;
@ -120,7 +120,7 @@ public class TestLeaseRenewer {
Assert.fail("Did not renew lease at all!");
}
renewer.closeFile(filePath, MOCK_DFSCLIENT);
renewer.closeFile(fileId, MOCK_DFSCLIENT);
}
/**
@ -138,8 +138,8 @@ public class TestLeaseRenewer {
// Set up a file so that we start renewing our lease.
DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
String filePath = "/foo";
renewer.put(filePath, mockStream1, mockClient1);
long fileId = 456L;
renewer.put(fileId, mockStream1, mockClient1);
// Second DFSClient does renew lease
final DFSClient mockClient2 = createMockClient();
@ -149,7 +149,7 @@ public class TestLeaseRenewer {
// Set up a file so that we start renewing our lease.
DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
renewer.put(filePath, mockStream2, mockClient2);
renewer.put(fileId, mockStream2, mockClient2);
// Wait for lease to get renewed
@ -170,19 +170,19 @@ public class TestLeaseRenewer {
}
}, 100, 10000);
renewer.closeFile(filePath, mockClient1);
renewer.closeFile(filePath, mockClient2);
renewer.closeFile(fileId, mockClient1);
renewer.closeFile(fileId, mockClient2);
}
@Test
public void testThreadName() throws Exception {
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
String filePath = "/foo";
long fileId = 789L;
Assert.assertFalse("Renewer not initially running",
renewer.isRunning());
// Pretend to open a file
renewer.put(filePath, mockStream, MOCK_DFSCLIENT);
renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
Assert.assertTrue("Renewer should have started running",
renewer.isRunning());
@ -192,7 +192,7 @@ public class TestLeaseRenewer {
Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
// Pretend to close the file
renewer.closeFile(filePath, MOCK_DFSCLIENT);
renewer.closeFile(fileId, MOCK_DFSCLIENT);
renewer.setEmptyTime(Time.now());
// Should stop the renewer running within a few seconds

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@ -159,12 +160,13 @@ public class TestPersistBlocks {
// Abandon the last block
DFSClient dfsclient = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
HdfsFileStatus fileStatus = dfsclient.getNamenode().getFileInfo(FILE_NAME);
LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(
FILE_NAME, 0, BLOCK_SIZE * NUM_BLOCKS);
assertEquals(NUM_BLOCKS, blocks.getLocatedBlocks().size());
LocatedBlock b = blocks.getLastLocatedBlock();
dfsclient.getNamenode().abandonBlock(b.getBlock(), FILE_NAME,
dfsclient.clientName);
dfsclient.getNamenode().abandonBlock(b.getBlock(), fileStatus.getFileId(),
FILE_NAME, dfsclient.clientName);
// explicitly do NOT close the file.
cluster.restartNameNode();

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import java.io.IOException;
@ -26,11 +28,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.log4j.Level;
import org.junit.Test;
import org.mockito.Mockito;
public class TestRenameWhileOpen {
{
@ -57,6 +61,7 @@ public class TestRenameWhileOpen {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, TestFileCreation.blockSize);
// create cluster
System.out.println("Test 1*****************************");
@ -65,6 +70,15 @@ public class TestRenameWhileOpen {
try {
cluster.waitActive();
fs = cluster.getFileSystem();
// Normally, the in-progress edit log would be finalized by
// FSEditLog#endCurrentLogSegment. For testing purposes, we
// disable that here.
FSEditLog spyLog =
spy(cluster.getNameNode().getFSImage().getEditLog());
doNothing().when(spyLog).endCurrentLogSegment(Mockito.anyBoolean());
cluster.getNameNode().getFSImage().setEditLogForTesting(spyLog);
final int nnport = cluster.getNameNodePort();
// create file1.
@ -92,18 +106,21 @@ public class TestRenameWhileOpen {
// create file3
Path file3 = new Path(dir3, "file3");
FSDataOutputStream stm3 = TestFileCreation.createFile(fs, file3, 1);
TestFileCreation.writeFile(stm3);
// rename file3 to some bad name
try {
fs.rename(file3, new Path(dir3, "$ "));
} catch(Exception e) {
e.printStackTrace();
}
FSDataOutputStream stm3 = fs.create(file3);
fs.rename(file3, new Path(dir3, "bozo"));
// Get a new block for the file.
TestFileCreation.writeFile(stm3, TestFileCreation.blockSize + 1);
stm3.hflush();
// restart cluster with the same namenode port as before.
// This ensures that leases are persisted in fsimage.
// Stop the NameNode before closing the files.
// This will ensure that the write leases are still active and present
// in the edit log. Simiarly, there should be a pending ADD_BLOCK_OP
// for file3, since we just added a block to that file.
cluster.getNameNode().stop();
// Restart cluster with the same namenode port as before.
cluster.shutdown();
try {Thread.sleep(2*MAX_IDLE_TIME);} catch (InterruptedException e) {}
cluster = new MiniDFSCluster.Builder(conf).nameNodePort(nnport)
.format(false)
@ -111,7 +128,7 @@ public class TestRenameWhileOpen {
cluster.waitActive();
// restart cluster yet again. This triggers the code to read in
// persistent leases from fsimage.
// persistent leases from the edit log.
cluster.shutdown();
try {Thread.sleep(5000);} catch (InterruptedException e) {}
cluster = new MiniDFSCluster.Builder(conf).nameNodePort(nnport)

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import org.junit.Test;
public class TestSetrepDecreasing {
@Test
@Test(timeout=120000)
public void testSetrepDecreasing() throws IOException {
TestSetrepIncreasing.setrep(5, 3, false);
}

View File

@ -75,11 +75,11 @@ public class TestSetrepIncreasing {
}
}
@Test
@Test(timeout=120000)
public void testSetrepIncreasing() throws IOException {
setrep(3, 7, false);
}
@Test
@Test(timeout=120000)
public void testSetrepIncreasingSimulatedStorage() throws IOException {
setrep(3, 7, true);
}

View File

@ -468,8 +468,8 @@ public class TestINodeFile {
}
}
@Test
public void testWriteToRenamedFile() throws IOException {
@Test(timeout=120000)
public void testWriteToDeletedFile() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
@ -486,18 +486,16 @@ public class TestINodeFile {
Path filePath = new Path("/test1/file");
FSDataOutputStream fos = fs.create(filePath);
// Rename /test1 to test2, and recreate /test1/file
Path renamedPath = new Path("/test2");
fs.rename(path, renamedPath);
fs.create(filePath, (short) 1);
// Delete the file
fs.delete(filePath, false);
// Add new block should fail since /test1/file has a different fileId
// Add new block should fail since /test1/file has been deleted.
try {
fos.write(data, 0, data.length);
// make sure addBlock() request gets to NN immediately
fos.hflush();
fail("Write should fail after rename");
fail("Write should fail after delete");
} catch (Exception e) {
/* Ignore */
} finally {

View File

@ -36,6 +36,7 @@ import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -819,10 +820,13 @@ public class TestHASafeMode {
null);
create.write(testData.getBytes());
create.hflush();
long fileId = ((DFSOutputStream)create.
getWrappedStream()).getFileId();
FileStatus fileStatus = dfs.getFileStatus(filePath);
DFSClient client = DFSClientAdapter.getClient(dfs);
// add one dummy block at NN, but not write to DataNode
ExtendedBlock previousBlock = DFSClientAdapter.getPreviousBlock(client,
pathString);
ExtendedBlock previousBlock =
DFSClientAdapter.getPreviousBlock(client, fileId);
DFSClientAdapter.getNamenode(client).addBlock(
pathString,
client.getClientName(),