HDFS-4340. Merge r1443169 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1471692 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
909e52274c
commit
13df6b76d2
|
@ -16,6 +16,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
HDFS-4339. Persist inode id in fsimage and editlog. (Brandon Li via
|
||||
suresh)
|
||||
|
||||
HDFS-4340. Update addBlock() to inculde inode id as additional argument.
|
||||
(Brandon Li via suresh)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when
|
||||
|
|
|
@ -121,6 +121,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
private volatile boolean closed = false;
|
||||
|
||||
private String src;
|
||||
private final long fileId;
|
||||
private final long blockSize;
|
||||
private final DataChecksum checksum;
|
||||
// both dataQueue and ackQueue are protected by dataQueue lock
|
||||
|
@ -1179,7 +1180,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
long localstart = Time.now();
|
||||
while (true) {
|
||||
try {
|
||||
return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes);
|
||||
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
|
||||
block, excludedNodes, fileId);
|
||||
} catch (RemoteException e) {
|
||||
IOException ue =
|
||||
e.unwrapRemoteException(FileNotFoundException.class,
|
||||
|
@ -1291,20 +1293,21 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
return value;
|
||||
}
|
||||
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
|
||||
DataChecksum checksum, short replication) throws IOException {
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
|
||||
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
|
||||
super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
|
||||
int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
this.dfsClient = dfsClient;
|
||||
this.src = src;
|
||||
this.blockSize = blockSize;
|
||||
this.blockReplication = replication;
|
||||
this.fileId = stat.getFileId();
|
||||
this.blockSize = stat.getBlockSize();
|
||||
this.blockReplication = stat.getReplication();
|
||||
this.progress = progress;
|
||||
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug(
|
||||
"Set non-null progress callback on DFSOutputStream " + src);
|
||||
}
|
||||
|
||||
final int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
|
||||
throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
|
||||
") and blockSize(" + blockSize +
|
||||
|
@ -1316,19 +1319,27 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
}
|
||||
|
||||
/** Construct a new output stream for creating a file. */
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked,
|
||||
EnumSet<CreateFlag> flag, boolean createParent, short replication,
|
||||
long blockSize, Progressable progress, int buffersize,
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||
EnumSet<CreateFlag> flag, Progressable progress,
|
||||
DataChecksum checksum) throws IOException {
|
||||
this(dfsClient, src, blockSize, progress, checksum, replication);
|
||||
this(dfsClient, src, progress, stat, checksum);
|
||||
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
|
||||
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
checksum.getBytesPerChecksum());
|
||||
|
||||
streamer = new DataStreamer();
|
||||
}
|
||||
|
||||
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize, Progressable progress, int buffersize,
|
||||
DataChecksum checksum) throws IOException {
|
||||
final HdfsFileStatus stat;
|
||||
try {
|
||||
dfsClient.namenode.create(
|
||||
src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
|
||||
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
|
||||
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
|
||||
blockSize);
|
||||
} catch(RemoteException re) {
|
||||
throw re.unwrapRemoteException(AccessControlException.class,
|
||||
DSQuotaExceededException.class,
|
||||
|
@ -1339,30 +1350,20 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
SafeModeException.class,
|
||||
UnresolvedPathException.class);
|
||||
}
|
||||
streamer = new DataStreamer();
|
||||
}
|
||||
|
||||
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize, Progressable progress, int buffersize,
|
||||
DataChecksum checksum) throws IOException {
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, masked,
|
||||
flag, createParent, replication, blockSize, progress, buffersize,
|
||||
checksum);
|
||||
out.streamer.start();
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
|
||||
flag, progress, checksum);
|
||||
out.start();
|
||||
return out;
|
||||
}
|
||||
|
||||
/** Construct a new output stream for append. */
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
|
||||
LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||
private DFSOutputStream(DFSClient dfsClient, String src,
|
||||
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||
DataChecksum checksum) throws IOException {
|
||||
this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
|
||||
this(dfsClient, src, progress, stat, checksum);
|
||||
initialFileSize = stat.getLen(); // length of file when opened
|
||||
|
||||
//
|
||||
// The last partial block of the file has to be filled.
|
||||
//
|
||||
if (lastBlock != null) {
|
||||
// indicate that we are appending to an existing block
|
||||
bytesCurBlock = lastBlock.getBlockSize();
|
||||
|
@ -1377,9 +1378,9 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
|
||||
int buffersize, Progressable progress, LocatedBlock lastBlock,
|
||||
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, buffersize,
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src,
|
||||
progress, lastBlock, stat, checksum);
|
||||
out.streamer.start();
|
||||
out.start();
|
||||
return out;
|
||||
}
|
||||
|
||||
|
@ -1751,6 +1752,10 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
isClosed();
|
||||
}
|
||||
|
||||
private synchronized void start() {
|
||||
streamer.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Aborts this output stream and releases any system
|
||||
* resources associated with this stream.
|
||||
|
|
|
@ -150,6 +150,8 @@ public interface ClientProtocol {
|
|||
* @param replication block replication factor.
|
||||
* @param blockSize maximum block size.
|
||||
*
|
||||
* @return the status of the created file, it could be null if the server
|
||||
* doesn't support returning the file status
|
||||
* @throws AccessControlException If access is denied
|
||||
* @throws AlreadyBeingCreatedException if the path does not exist.
|
||||
* @throws DSQuotaExceededException If file creation violates disk space
|
||||
|
@ -168,13 +170,14 @@ public interface ClientProtocol {
|
|||
* RuntimeExceptions:
|
||||
* @throws InvalidPathException Path <code>src</code> is invalid
|
||||
*/
|
||||
public void create(String src, FsPermission masked, String clientName,
|
||||
EnumSetWritable<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize) throws AccessControlException,
|
||||
AlreadyBeingCreatedException, DSQuotaExceededException,
|
||||
FileAlreadyExistsException, FileNotFoundException,
|
||||
NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
|
||||
UnresolvedLinkException, IOException;
|
||||
public HdfsFileStatus create(String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
throws AccessControlException, AlreadyBeingCreatedException,
|
||||
DSQuotaExceededException, FileAlreadyExistsException,
|
||||
FileNotFoundException, NSQuotaExceededException,
|
||||
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
|
||||
IOException;
|
||||
|
||||
/**
|
||||
* Append to the end of the file.
|
||||
|
@ -296,6 +299,7 @@ public interface ClientProtocol {
|
|||
* @param previous previous block
|
||||
* @param excludeNodes a list of nodes that should not be
|
||||
* allocated for the current block
|
||||
* @param fileId the id uniquely identifying a file
|
||||
*
|
||||
* @return LocatedBlock allocated block information.
|
||||
*
|
||||
|
@ -310,7 +314,7 @@ public interface ClientProtocol {
|
|||
*/
|
||||
@Idempotent
|
||||
public LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
|
||||
throws AccessControlException, FileNotFoundException,
|
||||
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
|
||||
IOException;
|
||||
|
|
|
@ -40,6 +40,7 @@ public class HdfsFileStatus {
|
|||
private FsPermission permission;
|
||||
private String owner;
|
||||
private String group;
|
||||
private long fileId;
|
||||
|
||||
public static final byte[] EMPTY_NAME = new byte[0];
|
||||
|
||||
|
@ -55,11 +56,12 @@ public class HdfsFileStatus {
|
|||
* @param owner the owner of the path
|
||||
* @param group the group of the path
|
||||
* @param path the local name in java UTF8 encoding the same as that in-memory
|
||||
* @param fileId the file id
|
||||
*/
|
||||
public HdfsFileStatus(long length, boolean isdir, int block_replication,
|
||||
long blocksize, long modification_time, long access_time,
|
||||
FsPermission permission, String owner, String group,
|
||||
byte[] symlink, byte[] path) {
|
||||
byte[] symlink, byte[] path, long fileId) {
|
||||
this.length = length;
|
||||
this.isdir = isdir;
|
||||
this.block_replication = (short)block_replication;
|
||||
|
@ -75,6 +77,7 @@ public class HdfsFileStatus {
|
|||
this.group = (group == null) ? "" : group;
|
||||
this.symlink = symlink;
|
||||
this.path = path;
|
||||
this.fileId = fileId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -223,4 +226,8 @@ public class HdfsFileStatus {
|
|||
final public byte[] getSymlinkInBytes() {
|
||||
return symlink;
|
||||
}
|
||||
|
||||
final public long getFileId() {
|
||||
return fileId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,19 +44,19 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
|
|||
* @param group group
|
||||
* @param symlink symbolic link
|
||||
* @param path local path name in java UTF8 format
|
||||
* @param fileId the file id
|
||||
* @param locations block locations
|
||||
*/
|
||||
public HdfsLocatedFileStatus(long length, boolean isdir,
|
||||
int block_replication,
|
||||
long blocksize, long modification_time, long access_time,
|
||||
FsPermission permission, String owner, String group,
|
||||
byte[] symlink, byte[] path, LocatedBlocks locations) {
|
||||
super(length, isdir, block_replication, blocksize, modification_time,
|
||||
access_time, permission, owner, group, symlink, path);
|
||||
int block_replication, long blocksize, long modification_time,
|
||||
long access_time, FsPermission permission, String owner, String group,
|
||||
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations) {
|
||||
super(length, isdir, block_replication, blocksize, modification_time,
|
||||
access_time, permission, owner, group, symlink, path, fileId);
|
||||
this.locations = locations;
|
||||
}
|
||||
}
|
||||
|
||||
public LocatedBlocks getBlockLocations() {
|
||||
return locations;
|
||||
}
|
||||
public LocatedBlocks getBlockLocations() {
|
||||
return locations;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -270,14 +270,19 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
public CreateResponseProto create(RpcController controller,
|
||||
CreateRequestProto req) throws ServiceException {
|
||||
try {
|
||||
server.create(req.getSrc(), PBHelper.convert(req.getMasked()),
|
||||
req.getClientName(), PBHelper.convert(req.getCreateFlag()),
|
||||
req.getCreateParent(), (short) req.getReplication(),
|
||||
req.getBlockSize());
|
||||
HdfsFileStatus result = server.create(req.getSrc(),
|
||||
PBHelper.convert(req.getMasked()), req.getClientName(),
|
||||
PBHelper.convert(req.getCreateFlag()), req.getCreateParent(),
|
||||
(short) req.getReplication(), req.getBlockSize());
|
||||
|
||||
if (result != null) {
|
||||
return CreateResponseProto.newBuilder().setFs(PBHelper.convert(result))
|
||||
.build();
|
||||
}
|
||||
return VOID_CREATE_RESPONSE;
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return VOID_CREATE_RESPONSE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -350,13 +355,14 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
|
||||
try {
|
||||
List<DatanodeInfoProto> excl = req.getExcludeNodesList();
|
||||
LocatedBlock result = server.addBlock(req.getSrc(), req.getClientName(),
|
||||
LocatedBlock result = server.addBlock(
|
||||
req.getSrc(),
|
||||
req.getClientName(),
|
||||
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
|
||||
(excl == null ||
|
||||
excl.size() == 0) ? null :
|
||||
PBHelper.convert(excl.toArray(new DatanodeInfoProto[excl.size()])));
|
||||
return AddBlockResponseProto.newBuilder().setBlock(
|
||||
PBHelper.convert(result)).build();
|
||||
(excl == null || excl.size() == 0) ? null : PBHelper.convert(excl
|
||||
.toArray(new DatanodeInfoProto[excl.size()])), req.getFileId());
|
||||
return AddBlockResponseProto.newBuilder()
|
||||
.setBlock(PBHelper.convert(result)).build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Append
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
|
||||
|
@ -101,6 +102,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
|
@ -194,13 +196,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public void create(String src, FsPermission masked, String clientName,
|
||||
EnumSetWritable<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize) throws AccessControlException,
|
||||
AlreadyBeingCreatedException, DSQuotaExceededException,
|
||||
FileAlreadyExistsException, FileNotFoundException,
|
||||
NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
|
||||
UnresolvedLinkException, IOException {
|
||||
public HdfsFileStatus create(String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
throws AccessControlException, AlreadyBeingCreatedException,
|
||||
DSQuotaExceededException, FileAlreadyExistsException,
|
||||
FileNotFoundException, NSQuotaExceededException,
|
||||
ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
|
||||
IOException {
|
||||
CreateRequestProto req = CreateRequestProto.newBuilder()
|
||||
.setSrc(src)
|
||||
.setMasked(PBHelper.convert(masked))
|
||||
|
@ -211,7 +214,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
.setBlockSize(blockSize)
|
||||
.build();
|
||||
try {
|
||||
rpcProxy.create(null, req);
|
||||
CreateResponseProto res = rpcProxy.create(null, req);
|
||||
return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
|
@ -295,15 +299,15 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
|
||||
throws AccessControlException, FileNotFoundException,
|
||||
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
|
||||
IOException {
|
||||
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder().setSrc(src)
|
||||
.setClientName(clientName);
|
||||
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
|
||||
.setSrc(src).setClientName(clientName).setFileId(fileId);
|
||||
if (previous != null)
|
||||
req.setPrevious(PBHelper.convert(previous));
|
||||
if (excludeNodes != null)
|
||||
|
|
|
@ -106,6 +106,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
||||
|
@ -1045,6 +1046,7 @@ public class PBHelper {
|
|||
fs.getFileType().equals(FileType.IS_SYMLINK) ?
|
||||
fs.getSymlink().toByteArray() : null,
|
||||
fs.getPath().toByteArray(),
|
||||
fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID,
|
||||
fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null);
|
||||
}
|
||||
|
||||
|
@ -1069,6 +1071,7 @@ public class PBHelper {
|
|||
setPermission(PBHelper.convert(fs.getPermission())).
|
||||
setOwner(fs.getOwner()).
|
||||
setGroup(fs.getGroup()).
|
||||
setFileId(fs.getFileId()).
|
||||
setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
|
||||
if (fs.isSymlink()) {
|
||||
builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
|
||||
|
|
|
@ -1989,7 +1989,8 @@ public class FSDirectory implements Closeable {
|
|||
node.getUserName(),
|
||||
node.getGroupName(),
|
||||
node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
|
||||
path);
|
||||
path,
|
||||
node.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2026,6 +2027,7 @@ public class FSDirectory implements Closeable {
|
|||
node.getGroupName(),
|
||||
node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
|
||||
path,
|
||||
node.getId(),
|
||||
loc);
|
||||
}
|
||||
|
||||
|
|
|
@ -1743,26 +1743,29 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
* Create a new file entry in the namespace.
|
||||
*
|
||||
* For description of parameters and exceptions thrown see
|
||||
* {@link ClientProtocol#create()}
|
||||
* {@link ClientProtocol#create()}, except it returns valid file status
|
||||
* upon success
|
||||
*/
|
||||
void startFile(String src, PermissionStatus permissions, String holder,
|
||||
String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize) throws AccessControlException,
|
||||
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
|
||||
HdfsFileStatus startFile(String src, PermissionStatus permissions,
|
||||
String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException {
|
||||
try {
|
||||
startFileInt(src, permissions, holder, clientMachine, flag, createParent,
|
||||
replication, blockSize);
|
||||
return startFileInt(src, permissions, holder, clientMachine, flag,
|
||||
createParent, replication, blockSize);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "create", src);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private void startFileInt(String src, PermissionStatus permissions, String holder,
|
||||
String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize) throws AccessControlException,
|
||||
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
|
||||
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
|
||||
String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException {
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
|
||||
|
@ -1777,12 +1780,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
|
||||
boolean skipSync = false;
|
||||
final HdfsFileStatus stat;
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
try {
|
||||
startFileInternal(pc, src, permissions, holder, clientMachine, flag,
|
||||
createParent, replication, blockSize);
|
||||
stat = dir.getFileInfo(src, false);
|
||||
} catch (StandbyException se) {
|
||||
skipSync = true;
|
||||
throw se;
|
||||
|
@ -1794,8 +1799,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
getEditLog().logSync();
|
||||
}
|
||||
}
|
||||
final HdfsFileStatus stat = getAuditFileInfo(src, false);
|
||||
logAuditEvent(true, "create", src, null, stat);
|
||||
return stat;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2156,11 +2161,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
* are replicated. Will return an empty 2-elt array if we want the
|
||||
* client to "try again later".
|
||||
*/
|
||||
LocatedBlock getAdditionalBlock(String src,
|
||||
String clientName,
|
||||
ExtendedBlock previous,
|
||||
HashMap<Node, Node> excludedNodes
|
||||
)
|
||||
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
||||
ExtendedBlock previous, HashMap<Node, Node> excludedNodes)
|
||||
throws LeaseExpiredException, NotReplicatedYetException,
|
||||
QuotaExceededException, SafeModeException, UnresolvedLinkException,
|
||||
IOException {
|
||||
|
@ -2181,7 +2183,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
checkOperation(OperationCategory.READ);
|
||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||
final INode[] inodes = analyzeFileState(
|
||||
src, clientName, previous, onRetryBlock).getINodes();
|
||||
src, fileId, clientName, previous, onRetryBlock).getINodes();
|
||||
final INodeFileUnderConstruction pendingFile =
|
||||
(INodeFileUnderConstruction) inodes[inodes.length - 1];
|
||||
|
||||
|
@ -2213,7 +2215,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
// while chooseTarget() was executing.
|
||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||
INodesInPath inodesInPath =
|
||||
analyzeFileState(src, clientName, previous, onRetryBlock);
|
||||
analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
|
||||
final INode[] inodes = inodesInPath.getINodes();
|
||||
final INodeFileUnderConstruction pendingFile =
|
||||
(INodeFileUnderConstruction) inodes[inodes.length - 1];
|
||||
|
@ -2245,6 +2247,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
|
||||
INodesInPath analyzeFileState(String src,
|
||||
long fileId,
|
||||
String clientName,
|
||||
ExtendedBlock previous,
|
||||
LocatedBlock[] onRetryBlock)
|
||||
|
@ -2265,7 +2268,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true);
|
||||
final INode[] inodes = inodesInPath.getINodes();
|
||||
final INodeFileUnderConstruction pendingFile
|
||||
= checkLease(src, clientName, inodes[inodes.length - 1]);
|
||||
= checkLease(src, fileId, clientName, inodes[inodes.length - 1]);
|
||||
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
||||
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
||||
// The block that the client claims is the current last block
|
||||
|
@ -2436,14 +2439,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
|
||||
// make sure that we still have the lease on this file.
|
||||
private INodeFileUnderConstruction checkLease(String src, String holder)
|
||||
throws LeaseExpiredException, UnresolvedLinkException {
|
||||
private INodeFileUnderConstruction checkLease(String src, String holder)
|
||||
throws LeaseExpiredException, UnresolvedLinkException,
|
||||
FileNotFoundException {
|
||||
assert hasReadOrWriteLock();
|
||||
return checkLease(src, holder, dir.getINode(src));
|
||||
return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
|
||||
dir.getINode(src));
|
||||
}
|
||||
|
||||
private INodeFileUnderConstruction checkLease(String src, String holder,
|
||||
INode file) throws LeaseExpiredException {
|
||||
|
||||
private INodeFileUnderConstruction checkLease(String src, long fileId,
|
||||
String holder, INode file) throws LeaseExpiredException,
|
||||
FileNotFoundException {
|
||||
assert hasReadOrWriteLock();
|
||||
if (file == null || !(file instanceof INodeFile)) {
|
||||
Lease lease = leaseManager.getLease(holder);
|
||||
|
@ -2464,6 +2470,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
|
||||
+ pendingFile.getClientName() + " but is accessed by " + holder);
|
||||
}
|
||||
INodeId.checkId(fileId, pendingFile);
|
||||
return pendingFile;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,18 +17,21 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.util.SequentialNumber;
|
||||
|
||||
/**
|
||||
* An id which uniquely identifies an inode
|
||||
* An id which uniquely identifies an inode. Id 1 to 1000 are reserved for
|
||||
* potential future usage. The id won't be recycled and is not expected to wrap
|
||||
* around in a very long time. Root inode id is always 1001. Id 0 is used for
|
||||
* backward compatibility support.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class INodeId extends SequentialNumber {
|
||||
public class INodeId extends SequentialNumber {
|
||||
/**
|
||||
* The last reserved inode id. Reserve id 1 to 1000 for potential future
|
||||
* usage. The id won't be recycled and is not expected to wrap around in a
|
||||
* very long time. Root inode id will be 1001.
|
||||
* The last reserved inode id.
|
||||
*/
|
||||
public static final long LAST_RESERVED_ID = 1000L;
|
||||
|
||||
|
@ -38,6 +41,19 @@ class INodeId extends SequentialNumber {
|
|||
*/
|
||||
public static final long GRANDFATHER_INODE_ID = 0;
|
||||
|
||||
/**
|
||||
* To check if the request id is the same as saved id. Don't check fileId
|
||||
* with GRANDFATHER_INODE_ID for backward compatibility.
|
||||
*/
|
||||
public static void checkId(long requestId, INode inode)
|
||||
throws FileNotFoundException {
|
||||
if (requestId != GRANDFATHER_INODE_ID && requestId != inode.getId()) {
|
||||
throw new FileNotFoundException(
|
||||
"ID mismatch. Request id and saved id: " + requestId + " , "
|
||||
+ inode.getId());
|
||||
}
|
||||
}
|
||||
|
||||
INodeId() {
|
||||
super(LAST_RESERVED_ID);
|
||||
}
|
||||
|
|
|
@ -412,13 +412,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void create(String src,
|
||||
FsPermission masked,
|
||||
String clientName,
|
||||
EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent,
|
||||
short replication,
|
||||
long blockSize) throws IOException {
|
||||
public HdfsFileStatus create(String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize)
|
||||
throws IOException {
|
||||
String clientMachine = getClientMachine();
|
||||
if (stateChangeLog.isDebugEnabled()) {
|
||||
stateChangeLog.debug("*DIR* NameNode.create: file "
|
||||
|
@ -428,12 +425,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
throw new IOException("create: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
}
|
||||
namesystem.startFile(src,
|
||||
new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
|
||||
null, masked),
|
||||
clientName, clientMachine, flag.get(), createParent, replication, blockSize);
|
||||
HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
|
||||
UserGroupInformation.getCurrentUser().getShortUserName(), null, masked),
|
||||
clientName, clientMachine, flag.get(), createParent, replication,
|
||||
blockSize);
|
||||
metrics.incrFilesCreated();
|
||||
metrics.incrCreateFileOps();
|
||||
return fileStatus;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
@ -472,26 +470,24 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
throws IOException {
|
||||
namesystem.setOwner(src, username, groupname);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public LocatedBlock addBlock(String src,
|
||||
String clientName,
|
||||
ExtendedBlock previous,
|
||||
DatanodeInfo[] excludedNodes)
|
||||
|
||||
@Override
|
||||
public LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId)
|
||||
throws IOException {
|
||||
if(stateChangeLog.isDebugEnabled()) {
|
||||
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
|
||||
+src+" for "+clientName);
|
||||
if (stateChangeLog.isDebugEnabled()) {
|
||||
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
|
||||
+ " fileId=" + fileId + " for " + clientName);
|
||||
}
|
||||
HashMap<Node, Node> excludedNodesSet = null;
|
||||
if (excludedNodes != null) {
|
||||
excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
|
||||
for (Node node:excludedNodes) {
|
||||
for (Node node : excludedNodes) {
|
||||
excludedNodesSet.put(node, node);
|
||||
}
|
||||
}
|
||||
LocatedBlock locatedBlock =
|
||||
namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
|
||||
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
|
||||
clientName, previous, excludedNodesSet);
|
||||
if (locatedBlock != null)
|
||||
metrics.incrAddBlockOps();
|
||||
return locatedBlock;
|
||||
|
|
|
@ -219,6 +219,7 @@ public class JsonUtil {
|
|||
m.put("modificationTime", status.getModificationTime());
|
||||
m.put("blockSize", status.getBlockSize());
|
||||
m.put("replication", status.getReplication());
|
||||
m.put("fileId", status.getFileId());
|
||||
return includeType ? toJsonString(FileStatus.class, m): JSON.toString(m);
|
||||
}
|
||||
|
||||
|
@ -243,9 +244,10 @@ public class JsonUtil {
|
|||
final long mTime = (Long) m.get("modificationTime");
|
||||
final long blockSize = (Long) m.get("blockSize");
|
||||
final short replication = (short) (long) (Long) m.get("replication");
|
||||
final long fileId = (Long) m.get("fileId");
|
||||
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
|
||||
blockSize, mTime, aTime, permission, owner, group,
|
||||
symlink, DFSUtil.string2Bytes(localName));
|
||||
symlink, DFSUtil.string2Bytes(localName), fileId);
|
||||
}
|
||||
|
||||
/** Convert an ExtendedBlock to a Json map. */
|
||||
|
|
|
@ -67,7 +67,8 @@ message CreateRequestProto {
|
|||
required uint64 blockSize = 7;
|
||||
}
|
||||
|
||||
message CreateResponseProto { // void response
|
||||
message CreateResponseProto {
|
||||
optional HdfsFileStatusProto fs = 1;
|
||||
}
|
||||
|
||||
message AppendRequestProto {
|
||||
|
@ -119,6 +120,7 @@ message AddBlockRequestProto {
|
|||
required string clientName = 2;
|
||||
optional ExtendedBlockProto previous = 3;
|
||||
repeated DatanodeInfoProto excludeNodes = 4;
|
||||
optional uint64 fileId = 5 [default = 0]; // default as a bogus id
|
||||
}
|
||||
|
||||
message AddBlockResponseProto {
|
||||
|
|
|
@ -170,6 +170,9 @@ message HdfsFileStatusProto {
|
|||
optional uint32 block_replication = 10 [default = 0]; // only 16bits used
|
||||
optional uint64 blocksize = 11 [default = 0];
|
||||
optional LocatedBlocksProto locations = 12; // suppled only if asked by client
|
||||
|
||||
// Optional field for fileId
|
||||
optional uint64 fileId = 13 [default = 0]; // default as an invalid id
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,7 +23,10 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyShort;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -49,13 +52,13 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.client.HdfsUtils;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
@ -64,12 +67,14 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
@ -208,7 +213,7 @@ public class TestDFSClientRetries {
|
|||
* Verify that client will correctly give up after the specified number
|
||||
* of times trying to add a block
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@SuppressWarnings({ "serial", "unchecked" })
|
||||
@Test
|
||||
public void testNotYetReplicatedErrors() throws IOException
|
||||
{
|
||||
|
@ -235,7 +240,22 @@ public class TestDFSClientRetries {
|
|||
when(mockNN.addBlock(anyString(),
|
||||
anyString(),
|
||||
any(ExtendedBlock.class),
|
||||
any(DatanodeInfo[].class))).thenAnswer(answer);
|
||||
any(DatanodeInfo[].class),
|
||||
anyLong())).thenAnswer(answer);
|
||||
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010)).when(mockNN).getFileInfo(anyString());
|
||||
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010))
|
||||
.when(mockNN)
|
||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||
anyShort(), anyLong());
|
||||
|
||||
final DFSClient client = new DFSClient(null, mockNN, conf, null);
|
||||
OutputStream os = client.create("testfile", true);
|
||||
|
@ -369,7 +389,8 @@ public class TestDFSClientRetries {
|
|||
return ret2;
|
||||
}
|
||||
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
|
||||
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
|
||||
Mockito.anyLong());
|
||||
|
||||
doAnswer(new Answer<Boolean>() {
|
||||
|
||||
|
@ -410,7 +431,8 @@ public class TestDFSClientRetries {
|
|||
// Make sure the mock was actually properly injected.
|
||||
Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
|
||||
Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
|
||||
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
|
||||
Mockito.anyLong());
|
||||
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
|
||||
Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock>any());
|
||||
|
|
|
@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
|
@ -517,8 +518,8 @@ public class TestFileCreation {
|
|||
+ "The file has " + locations.locatedBlockCount() + " blocks.");
|
||||
|
||||
// add one block to the file
|
||||
LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
|
||||
client.clientName, null, null);
|
||||
LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
|
||||
client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID);
|
||||
System.out.println("testFileCreationError2: "
|
||||
+ "Added block " + location.getBlock());
|
||||
|
||||
|
@ -568,8 +569,8 @@ public class TestFileCreation {
|
|||
final Path f = new Path("/foo.txt");
|
||||
createFile(dfs, f, 3);
|
||||
try {
|
||||
cluster.getNameNodeRpc().addBlock(f.toString(),
|
||||
client.clientName, null, null);
|
||||
cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName,
|
||||
null, null, INodeId.GRANDFATHER_INODE_ID);
|
||||
fail();
|
||||
} catch(IOException ioe) {
|
||||
FileSystem.LOG.info("GOOD!", ioe);
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.anyShort;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
@ -29,15 +33,18 @@ import java.security.PrivilegedExceptionAction;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
|
@ -274,6 +281,7 @@ public class TestLease {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testFactory() throws Exception {
|
||||
final String[] groups = new String[]{"supergroup"};
|
||||
|
@ -282,6 +290,20 @@ public class TestLease {
|
|||
ugi[i] = UserGroupInformation.createUserForTesting("user" + i, groups);
|
||||
}
|
||||
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010)).when(mcp).getFileInfo(anyString());
|
||||
Mockito
|
||||
.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010))
|
||||
.when(mcp)
|
||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||
anyShort(), anyLong());
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
final DFSClient c1 = createDFSClientAs(ugi[0], conf);
|
||||
FSDataOutputStream out1 = createFsOut(c1, "/out1");
|
||||
|
|
|
@ -1058,7 +1058,8 @@ public class NNThroughputBenchmark {
|
|||
throws IOException {
|
||||
ExtendedBlock prevBlock = null;
|
||||
for(int jdx = 0; jdx < blocksPerFile; jdx++) {
|
||||
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null);
|
||||
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
|
||||
prevBlock, null, INodeId.GRANDFATHER_INODE_ID);
|
||||
prevBlock = loc.getBlock();
|
||||
for(DatanodeInfo dnInfo : loc.getLocations()) {
|
||||
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr());
|
||||
|
|
|
@ -107,7 +107,8 @@ public class TestAddBlockRetry {
|
|||
count++;
|
||||
if(count == 1) { // run second addBlock()
|
||||
LOG.info("Starting second addBlock for " + src);
|
||||
nn.addBlock(src, "clientName", null, null);
|
||||
nn.addBlock(src, "clientName", null, null,
|
||||
INodeId.GRANDFATHER_INODE_ID);
|
||||
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
|
||||
lb2 = lbs.get(0);
|
||||
|
@ -128,7 +129,7 @@ public class TestAddBlockRetry {
|
|||
|
||||
// start first addBlock()
|
||||
LOG.info("Starting first addBlock for " + src);
|
||||
nn.addBlock(src, "clientName", null, null);
|
||||
nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID);
|
||||
|
||||
// check locations
|
||||
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||
|
|
|
@ -24,8 +24,10 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
|
@ -38,6 +40,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestINodeFile {
|
||||
|
@ -375,7 +379,7 @@ public class TestINodeFile {
|
|||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void TestInodeId() throws IOException {
|
||||
public void testInodeId() throws IOException {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
||||
|
@ -449,32 +453,36 @@ public class TestINodeFile {
|
|||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
|
||||
FSNamesystem fsn = cluster.getNamesystem();
|
||||
long lastId = fsn.getLastInodeId();
|
||||
|
||||
assertTrue(lastId == 1001);
|
||||
|
||||
// Create one directory and the last inode id should increase to 1002
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
Path path = new Path("/test1");
|
||||
assertTrue(fs.mkdirs(path));
|
||||
assertTrue(fsn.getLastInodeId() == 1002);
|
||||
|
||||
int size = conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
|
||||
byte[] data = new byte[size];
|
||||
|
||||
// Create one file
|
||||
Path filePath = new Path("/test1/file");
|
||||
fs.create(filePath);
|
||||
assertTrue(fsn.getLastInodeId() == 1003);
|
||||
FSDataOutputStream fos = fs.create(filePath);
|
||||
|
||||
// Rename doesn't increase inode id
|
||||
// Rename /test1 to test2, and recreate /test1/file
|
||||
Path renamedPath = new Path("/test2");
|
||||
fs.rename(path, renamedPath);
|
||||
assertTrue(fsn.getLastInodeId() == 1003);
|
||||
fs.create(filePath, (short) 1);
|
||||
|
||||
cluster.restartNameNode();
|
||||
cluster.waitActive();
|
||||
// Make sure empty editlog can be handled
|
||||
cluster.restartNameNode();
|
||||
cluster.waitActive();
|
||||
assertTrue(fsn.getLastInodeId() == 1003);
|
||||
// Add new block should fail since /test1/file has a different fileId
|
||||
try {
|
||||
fos.write(data, 0, data.length);
|
||||
// make sure addBlock() request gets to NN immediately
|
||||
fos.hflush();
|
||||
|
||||
fail("Write should fail after rename");
|
||||
} catch (Exception e) {
|
||||
/* Ignore */
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -42,9 +43,10 @@ public class TestJsonUtil {
|
|||
public void testHdfsFileStatus() {
|
||||
final long now = Time.now();
|
||||
final String parent = "/dir";
|
||||
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L<<26,
|
||||
now, now + 10, new FsPermission((short)0644), "user", "group",
|
||||
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"));
|
||||
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
|
||||
now, now + 10, new FsPermission((short) 0644), "user", "group",
|
||||
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
|
||||
INodeId.GRANDFATHER_INODE_ID);
|
||||
final FileStatus fstatus = toFileStatus(status, parent);
|
||||
System.out.println("status = " + status);
|
||||
System.out.println("fstatus = " + fstatus);
|
||||
|
|
Loading…
Reference in New Issue