HDFS-4340. Update addBlock() to inculde inode id as additional argument. Contributed Brandon Li.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1443169 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-02-06 19:52:34 +00:00
parent 17e72be6d8
commit 4525c4a25b
23 changed files with 307 additions and 147 deletions

View File

@ -623,6 +623,7 @@ Release 2.0.3-alpha - 2013-02-06
HADOOP-9289. FsShell rm -f fails for non-matching globs. (Daryn Sharp via
suresh)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES

View File

@ -296,6 +296,9 @@ Trunk (Unreleased)
HDFS-4382. Fix typo MAX_NOT_CHANGED_INTERATIONS. (Ted Yu via suresh)
HDFS-4340. Update addBlock() to inculde inode id as additional argument.
(Brandon Li via suresh)
Release 2.0.4-beta - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -115,6 +115,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private volatile boolean closed = false;
private String src;
private final long fileId;
private final long blockSize;
private final DataChecksum checksum;
// both dataQueue and ackQueue are protected by dataQueue lock
@ -1148,7 +1149,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
long localstart = Time.now();
while (true) {
try {
return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes);
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
block, excludedNodes, fileId);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
@ -1261,20 +1263,21 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
return value;
}
private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
DataChecksum checksum, short replication) throws IOException {
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
int bytesPerChecksum = checksum.getBytesPerChecksum();
this.dfsClient = dfsClient;
this.src = src;
this.blockSize = blockSize;
this.blockReplication = replication;
this.fileId = stat.getFileId();
this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication();
this.progress = progress;
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug(
"Set non-null progress callback on DFSOutputStream " + src);
}
final int bytesPerChecksum = checksum.getBytesPerChecksum();
if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
") and blockSize(" + blockSize +
@ -1286,19 +1289,27 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
}
/** Construct a new output stream for creating a file. */
private DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum) throws IOException {
this(dfsClient, src, blockSize, progress, checksum, replication);
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
streamer = new DataStreamer();
}
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum) throws IOException {
final HdfsFileStatus stat;
try {
dfsClient.namenode.create(
src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
blockSize);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
DSQuotaExceededException.class,
@ -1309,30 +1320,20 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
SafeModeException.class,
UnresolvedPathException.class);
}
streamer = new DataStreamer();
}
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum) throws IOException {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, masked,
flag, createParent, replication, blockSize, progress, buffersize,
checksum);
out.streamer.start();
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum);
out.start();
return out;
}
/** Construct a new output stream for append. */
private DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat,
private DFSOutputStream(DFSClient dfsClient, String src,
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
DataChecksum checksum) throws IOException {
this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
this(dfsClient, src, progress, stat, checksum);
initialFileSize = stat.getLen(); // length of file when opened
//
// The last partial block of the file has to be filled.
//
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
@ -1347,9 +1348,9 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
int buffersize, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, buffersize,
final DFSOutputStream out = new DFSOutputStream(dfsClient, src,
progress, lastBlock, stat, checksum);
out.streamer.start();
out.start();
return out;
}
@ -1716,6 +1717,10 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
isClosed();
}
private synchronized void start() {
streamer.start();
}
/**
* Aborts this output stream and releases any system
* resources associated with this stream.

View File

@ -150,6 +150,8 @@ public interface ClientProtocol {
* @param replication block replication factor.
* @param blockSize maximum block size.
*
* @return the status of the created file, it could be null if the server
* doesn't support returning the file status
* @throws AccessControlException If access is denied
* @throws AlreadyBeingCreatedException if the path does not exist.
* @throws DSQuotaExceededException If file creation violates disk space
@ -168,13 +170,14 @@ public interface ClientProtocol {
* RuntimeExceptions:
* @throws InvalidPathException Path <code>src</code> is invalid
*/
public void create(String src, FsPermission masked, String clientName,
EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws AccessControlException,
AlreadyBeingCreatedException, DSQuotaExceededException,
FileAlreadyExistsException, FileNotFoundException,
NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
UnresolvedLinkException, IOException;
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize)
throws AccessControlException, AlreadyBeingCreatedException,
DSQuotaExceededException, FileAlreadyExistsException,
FileNotFoundException, NSQuotaExceededException,
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
IOException;
/**
* Append to the end of the file.
@ -296,6 +299,7 @@ public interface ClientProtocol {
* @param previous previous block
* @param excludeNodes a list of nodes that should not be
* allocated for the current block
* @param fileId the id uniquely identifying a file
*
* @return LocatedBlock allocated block information.
*
@ -310,7 +314,7 @@ public interface ClientProtocol {
*/
@Idempotent
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException;

View File

@ -40,6 +40,7 @@ public class HdfsFileStatus {
private FsPermission permission;
private String owner;
private String group;
private long fileId;
public static final byte[] EMPTY_NAME = new byte[0];
@ -55,11 +56,12 @@ public class HdfsFileStatus {
* @param owner the owner of the path
* @param group the group of the path
* @param path the local name in java UTF8 encoding the same as that in-memory
* @param fileId the file id
*/
public HdfsFileStatus(long length, boolean isdir, int block_replication,
long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group,
byte[] symlink, byte[] path) {
byte[] symlink, byte[] path, long fileId) {
this.length = length;
this.isdir = isdir;
this.block_replication = (short)block_replication;
@ -75,6 +77,7 @@ public class HdfsFileStatus {
this.group = (group == null) ? "" : group;
this.symlink = symlink;
this.path = path;
this.fileId = fileId;
}
/**
@ -223,4 +226,8 @@ public class HdfsFileStatus {
final public byte[] getSymlinkInBytes() {
return symlink;
}
final public long getFileId() {
return fileId;
}
}

View File

@ -44,19 +44,19 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
* @param group group
* @param symlink symbolic link
* @param path local path name in java UTF8 format
* @param fileId the file id
* @param locations block locations
*/
public HdfsLocatedFileStatus(long length, boolean isdir,
int block_replication,
long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group,
byte[] symlink, byte[] path, LocatedBlocks locations) {
super(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, symlink, path);
int block_replication, long blocksize, long modification_time,
long access_time, FsPermission permission, String owner, String group,
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations) {
super(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, symlink, path, fileId);
this.locations = locations;
}
}
public LocatedBlocks getBlockLocations() {
return locations;
}
public LocatedBlocks getBlockLocations() {
return locations;
}
}

View File

@ -268,14 +268,19 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public CreateResponseProto create(RpcController controller,
CreateRequestProto req) throws ServiceException {
try {
server.create(req.getSrc(), PBHelper.convert(req.getMasked()),
req.getClientName(), PBHelper.convert(req.getCreateFlag()),
req.getCreateParent(), (short) req.getReplication(),
req.getBlockSize());
HdfsFileStatus result = server.create(req.getSrc(),
PBHelper.convert(req.getMasked()), req.getClientName(),
PBHelper.convert(req.getCreateFlag()), req.getCreateParent(),
(short) req.getReplication(), req.getBlockSize());
if (result != null) {
return CreateResponseProto.newBuilder().setFs(PBHelper.convert(result))
.build();
}
return VOID_CREATE_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
}
return VOID_CREATE_RESPONSE;
}
@Override
@ -348,13 +353,14 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
try {
List<DatanodeInfoProto> excl = req.getExcludeNodesList();
LocatedBlock result = server.addBlock(req.getSrc(), req.getClientName(),
LocatedBlock result = server.addBlock(
req.getSrc(),
req.getClientName(),
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
(excl == null ||
excl.size() == 0) ? null :
PBHelper.convert(excl.toArray(new DatanodeInfoProto[excl.size()])));
return AddBlockResponseProto.newBuilder().setBlock(
PBHelper.convert(result)).build();
(excl == null || excl.size() == 0) ? null : PBHelper.convert(excl
.toArray(new DatanodeInfoProto[excl.size()])), req.getFileId());
return AddBlockResponseProto.newBuilder()
.setBlock(PBHelper.convert(result)).build();
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Append
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
@ -100,6 +101,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
@ -193,13 +195,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public void create(String src, FsPermission masked, String clientName,
EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws AccessControlException,
AlreadyBeingCreatedException, DSQuotaExceededException,
FileAlreadyExistsException, FileNotFoundException,
NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
UnresolvedLinkException, IOException {
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize)
throws AccessControlException, AlreadyBeingCreatedException,
DSQuotaExceededException, FileAlreadyExistsException,
FileNotFoundException, NSQuotaExceededException,
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
IOException {
CreateRequestProto req = CreateRequestProto.newBuilder()
.setSrc(src)
.setMasked(PBHelper.convert(masked))
@ -210,7 +213,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setBlockSize(blockSize)
.build();
try {
rpcProxy.create(null, req);
CreateResponseProto res = rpcProxy.create(null, req);
return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -294,15 +298,15 @@ public class ClientNamenodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException {
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder().setSrc(src)
.setClientName(clientName);
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
.setSrc(src).setClientName(clientName).setFileId(fileId);
if (previous != null)
req.setPrevious(PBHelper.convert(previous));
if (excludeNodes != null)

View File

@ -106,6 +106,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@ -1045,6 +1046,7 @@ public class PBHelper {
fs.getFileType().equals(FileType.IS_SYMLINK) ?
fs.getSymlink().toByteArray() : null,
fs.getPath().toByteArray(),
fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID,
fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null);
}
@ -1069,6 +1071,7 @@ public class PBHelper {
setPermission(PBHelper.convert(fs.getPermission())).
setOwner(fs.getOwner()).
setGroup(fs.getGroup()).
setFileId(fs.getFileId()).
setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
if (fs.isSymlink()) {
builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));

View File

@ -1985,7 +1985,8 @@ public class FSDirectory implements Closeable {
node.getUserName(),
node.getGroupName(),
node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
path);
path,
node.getId());
}
/**
@ -2022,6 +2023,7 @@ public class FSDirectory implements Closeable {
node.getGroupName(),
node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
path,
node.getId(),
loc);
}

View File

@ -1772,16 +1772,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Create a new file entry in the namespace.
*
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create()}
* {@link ClientProtocol#create()}, except it returns valid file status
* upon success
*/
void startFile(String src, PermissionStatus permissions, String holder,
String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws AccessControlException,
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
try {
startFileInt(src, permissions, holder, clientMachine, flag, createParent,
replication, blockSize);
return startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
} catch (AccessControlException e) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
@ -1792,18 +1794,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
private void startFileInt(String src, PermissionStatus permissions, String holder,
String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws AccessControlException,
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
boolean skipSync = false;
final HdfsFileStatus stat;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
startFileInternal(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
stat = dir.getFileInfo(src, false);
} catch (StandbyException se) {
skipSync = true;
throw se;
@ -1817,11 +1822,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
if (isAuditEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"create", src, null, stat);
}
return stat;
}
/**
@ -2192,11 +2197,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
LocatedBlock getAdditionalBlock(String src,
String clientName,
ExtendedBlock previous,
HashMap<Node, Node> excludedNodes
)
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
ExtendedBlock previous, HashMap<Node, Node> excludedNodes)
throws LeaseExpiredException, NotReplicatedYetException,
QuotaExceededException, SafeModeException, UnresolvedLinkException,
IOException {
@ -2215,7 +2217,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
final INode[] inodes = analyzeFileState(
src, clientName, previous, onRetryBlock).getINodes();
src, fileId, clientName, previous, onRetryBlock).getINodes();
final INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction) inodes[inodes.length - 1];
@ -2245,7 +2247,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// while chooseTarget() was executing.
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
INodesInPath inodesInPath =
analyzeFileState(src, clientName, previous, onRetryBlock);
analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
INode[] inodes = inodesInPath.getINodes();
final INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction) inodes[inodes.length - 1];
@ -2277,6 +2279,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
INodesInPath analyzeFileState(String src,
long fileId,
String clientName,
ExtendedBlock previous,
LocatedBlock[] onRetryBlock)
@ -2298,7 +2301,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
dir.rootDir.getExistingPathINodes(src, true);
final INode[] inodes = inodesInPath.getINodes();
final INodeFileUnderConstruction pendingFile
= checkLease(src, clientName, inodes[inodes.length - 1]);
= checkLease(src, fileId, clientName, inodes[inodes.length - 1]);
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
@ -2467,14 +2470,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
// make sure that we still have the lease on this file.
private INodeFileUnderConstruction checkLease(String src, String holder)
throws LeaseExpiredException, UnresolvedLinkException {
private INodeFileUnderConstruction checkLease(String src, String holder)
throws LeaseExpiredException, UnresolvedLinkException,
FileNotFoundException {
assert hasReadOrWriteLock();
return checkLease(src, holder, dir.getINode(src));
return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
dir.getINode(src));
}
private INodeFileUnderConstruction checkLease(String src, String holder,
INode file) throws LeaseExpiredException {
private INodeFileUnderConstruction checkLease(String src, long fileId,
String holder, INode file) throws LeaseExpiredException,
FileNotFoundException {
assert hasReadOrWriteLock();
if (file == null || !(file instanceof INodeFile)) {
Lease lease = leaseManager.getLease(holder);
@ -2495,6 +2501,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
+ pendingFile.getClientName() + " but is accessed by " + holder);
}
INodeId.checkId(fileId, pendingFile);
return pendingFile;
}

View File

@ -17,18 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.SequentialNumber;
/**
* An id which uniquely identifies an inode
* An id which uniquely identifies an inode. Id 1 to 1000 are reserved for
* potential future usage. The id won't be recycled and is not expected to wrap
* around in a very long time. Root inode id is always 1001. Id 0 is used for
* backward compatibility support.
*/
@InterfaceAudience.Private
class INodeId extends SequentialNumber {
public class INodeId extends SequentialNumber {
/**
* The last reserved inode id. Reserve id 1 to 1000 for potential future
* usage. The id won't be recycled and is not expected to wrap around in a
* very long time. Root inode id will be 1001.
* The last reserved inode id.
*/
public static final long LAST_RESERVED_ID = 1000L;
@ -38,6 +41,19 @@ class INodeId extends SequentialNumber {
*/
public static final long GRANDFATHER_INODE_ID = 0;
/**
* To check if the request id is the same as saved id. Don't check fileId
* with GRANDFATHER_INODE_ID for backward compatibility.
*/
public static void checkId(long requestId, INode inode)
throws FileNotFoundException {
if (requestId != GRANDFATHER_INODE_ID && requestId != inode.getId()) {
throw new FileNotFoundException(
"ID mismatch. Request id and saved id: " + requestId + " , "
+ inode.getId());
}
}
INodeId() {
super(LAST_RESERVED_ID);
}

View File

@ -422,13 +422,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
public void create(String src,
FsPermission masked,
String clientName,
EnumSetWritable<CreateFlag> flag,
boolean createParent,
short replication,
long blockSize) throws IOException {
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize)
throws IOException {
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
@ -438,12 +435,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.startFile(src,
new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
null, masked),
clientName, clientMachine, flag.get(), createParent, replication, blockSize);
HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
UserGroupInformation.getCurrentUser().getShortUserName(), null, masked),
clientName, clientMachine, flag.get(), createParent, replication,
blockSize);
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
return fileStatus;
}
@Override // ClientProtocol
@ -482,26 +480,24 @@ class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
namesystem.setOwner(src, username, groupname);
}
@Override // ClientProtocol
public LocatedBlock addBlock(String src,
String clientName,
ExtendedBlock previous,
DatanodeInfo[] excludedNodes)
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId)
throws IOException {
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName);
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
+ " fileId=" + fileId + " for " + clientName);
}
HashMap<Node, Node> excludedNodesSet = null;
if (excludedNodes != null) {
excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
for (Node node:excludedNodes) {
for (Node node : excludedNodes) {
excludedNodesSet.put(node, node);
}
}
LocatedBlock locatedBlock =
namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodesSet);
if (locatedBlock != null)
metrics.incrAddBlockOps();
return locatedBlock;

View File

@ -219,6 +219,7 @@ public class JsonUtil {
m.put("modificationTime", status.getModificationTime());
m.put("blockSize", status.getBlockSize());
m.put("replication", status.getReplication());
m.put("fileId", status.getFileId());
return includeType ? toJsonString(FileStatus.class, m): JSON.toString(m);
}
@ -243,9 +244,10 @@ public class JsonUtil {
final long mTime = (Long) m.get("modificationTime");
final long blockSize = (Long) m.get("blockSize");
final short replication = (short) (long) (Long) m.get("replication");
final long fileId = (Long) m.get("fileId");
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
blockSize, mTime, aTime, permission, owner, group,
symlink, DFSUtil.string2Bytes(localName));
symlink, DFSUtil.string2Bytes(localName), fileId);
}
/** Convert an ExtendedBlock to a Json map. */

View File

@ -67,7 +67,8 @@ message CreateRequestProto {
required uint64 blockSize = 7;
}
message CreateResponseProto { // void response
message CreateResponseProto {
optional HdfsFileStatusProto fs = 1;
}
message AppendRequestProto {
@ -119,6 +120,7 @@ message AddBlockRequestProto {
required string clientName = 2;
optional ExtendedBlockProto previous = 3;
repeated DatanodeInfoProto excludeNodes = 4;
optional uint64 fileId = 5 [default = 0]; // default as a bogus id
}
message AddBlockResponseProto {

View File

@ -170,6 +170,9 @@ message HdfsFileStatusProto {
optional uint32 block_replication = 10 [default = 0]; // only 16bits used
optional uint64 blocksize = 11 [default = 0];
optional LocatedBlocksProto locations = 12; // suppled only if asked by client
// Optional field for fileId
optional uint64 fileId = 13 [default = 0]; // default as an invalid id
}
/**

View File

@ -23,7 +23,10 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -49,13 +52,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsUtils;
import org.apache.hadoop.hdfs.protocol.Block;
@ -64,12 +67,14 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@ -208,7 +213,7 @@ public class TestDFSClientRetries {
* Verify that client will correctly give up after the specified number
* of times trying to add a block
*/
@SuppressWarnings("serial")
@SuppressWarnings({ "serial", "unchecked" })
@Test
public void testNotYetReplicatedErrors() throws IOException
{
@ -235,7 +240,22 @@ public class TestDFSClientRetries {
when(mockNN.addBlock(anyString(),
anyString(),
any(ExtendedBlock.class),
any(DatanodeInfo[].class))).thenAnswer(answer);
any(DatanodeInfo[].class),
anyLong())).thenAnswer(answer);
Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0],
1010)).when(mockNN).getFileInfo(anyString());
Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0],
1010))
.when(mockNN)
.create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
anyShort(), anyLong());
final DFSClient client = new DFSClient(null, mockNN, conf, null);
OutputStream os = client.create("testfile", true);
@ -369,7 +389,8 @@ public class TestDFSClientRetries {
return ret2;
}
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
Mockito.anyLong());
doAnswer(new Answer<Boolean>() {
@ -410,7 +431,8 @@ public class TestDFSClientRetries {
// Make sure the mock was actually properly injected.
Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
Mockito.anyLong());
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any());

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
@ -517,8 +518,8 @@ public class TestFileCreation {
+ "The file has " + locations.locatedBlockCount() + " blocks.");
// add one block to the file
LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
client.clientName, null, null);
LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID);
System.out.println("testFileCreationError2: "
+ "Added block " + location.getBlock());
@ -568,8 +569,8 @@ public class TestFileCreation {
final Path f = new Path("/foo.txt");
createFile(dfs, f, 3);
try {
cluster.getNameNodeRpc().addBlock(f.toString(),
client.clientName, null, null);
cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName,
null, null, INodeId.GRANDFATHER_INODE_ID);
fail();
} catch(IOException ioe) {
FileSystem.LOG.info("GOOD!", ioe);

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.hdfs;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@ -29,14 +33,18 @@ import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@ -256,6 +264,7 @@ public class TestLease {
}
}
@SuppressWarnings("unchecked")
@Test
public void testFactory() throws Exception {
final String[] groups = new String[]{"supergroup"};
@ -264,6 +273,20 @@ public class TestLease {
ugi[i] = UserGroupInformation.createUserForTesting("user" + i, groups);
}
Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0],
1010)).when(mcp).getFileInfo(anyString());
Mockito
.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0],
1010))
.when(mcp)
.create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
anyShort(), anyLong());
final Configuration conf = new Configuration();
final DFSClient c1 = createDFSClientAs(ugi[0], conf);
FSDataOutputStream out1 = createFsOut(c1, "/out1");

View File

@ -1058,7 +1058,8 @@ public class NNThroughputBenchmark {
throws IOException {
ExtendedBlock prevBlock = null;
for(int jdx = 0; jdx < blocksPerFile; jdx++) {
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null);
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
prevBlock, null, INodeId.GRANDFATHER_INODE_ID);
prevBlock = loc.getBlock();
for(DatanodeInfo dnInfo : loc.getLocations()) {
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr());

View File

@ -107,7 +107,8 @@ public class TestAddBlockRetry {
count++;
if(count == 1) { // run second addBlock()
LOG.info("Starting second addBlock for " + src);
nn.addBlock(src, "clientName", null, null);
nn.addBlock(src, "clientName", null, null,
INodeId.GRANDFATHER_INODE_ID);
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
lb2 = lbs.get(0);
@ -128,7 +129,7 @@ public class TestAddBlockRetry {
// start first addBlock()
LOG.info("Starting first addBlock for " + src);
nn.addBlock(src, "clientName", null, null);
nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID);
// check locations
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);

View File

@ -24,8 +24,10 @@ import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
@ -39,6 +41,8 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
import org.junit.Test;
public class TestINodeFile {
@ -376,7 +380,7 @@ public class TestINodeFile {
* @throws IOException
*/
@Test
public void TestInodeId() throws IOException {
public void testInodeId() throws IOException {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
@ -396,9 +400,14 @@ public class TestINodeFile {
assertTrue(fs.mkdirs(path));
assertTrue(fsn.getLastInodeId() == 1002);
Path filePath = new Path("/test1/file");
fs.create(filePath);
// Use namenode rpc to create a file
NamenodeProtocols nnrpc = cluster.getNameNodeRpc();
HdfsFileStatus fileStatus = nnrpc.create("/test1/file", new FsPermission(
(short) 0755), "client",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 1, 128 * 1024 * 1024L);
assertTrue(fsn.getLastInodeId() == 1003);
assertTrue(fileStatus.getFileId() == 1003);
// Rename doesn't increase inode id
Path renamedPath = new Path("/test2");
@ -412,4 +421,44 @@ public class TestINodeFile {
cluster.waitActive();
assertTrue(fsn.getLastInodeId() == 1003);
}
@Test
public void testWriteToRenamedFile() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path path = new Path("/test1");
assertTrue(fs.mkdirs(path));
int size = conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
byte[] data = new byte[size];
// Create one file
Path filePath = new Path("/test1/file");
FSDataOutputStream fos = fs.create(filePath);
// Rename /test1 to test2, and recreate /test1/file
Path renamedPath = new Path("/test2");
fs.rename(path, renamedPath);
fs.create(filePath, (short) 1);
// Add new block should fail since /test1/file has a different fileId
try {
fos.write(data, 0, data.length);
// make sure addBlock() request gets to NN immediately
fos.hflush();
fail("Write should fail after rename");
} catch (Exception e) {
/* Ignore */
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
@ -42,9 +43,10 @@ public class TestJsonUtil {
public void testHdfsFileStatus() {
final long now = Time.now();
final String parent = "/dir";
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L<<26,
now, now + 10, new FsPermission((short)0644), "user", "group",
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"));
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
now, now + 10, new FsPermission((short) 0644), "user", "group",
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
INodeId.GRANDFATHER_INODE_ID);
final FileStatus fstatus = toFileStatus(status, parent);
System.out.println("status = " + status);
System.out.println("fstatus = " + fstatus);