HDFS-4679. Merge 1466721 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1466729 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-04-10 23:17:23 +00:00
parent 9be0d30af4
commit 71a4677b36
3 changed files with 134 additions and 147 deletions

View File

@ -44,6 +44,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-3940. Add Gset#clear method and clear the block map when namenode is
shutdown. (suresh)
HDFS-4679. Namenode operation checks should be done in a consistent
manner. (suresh)
OPTIMIZATIONS
BUG FIXES

View File

@ -1161,7 +1161,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set permission for " + src, safeMode);
}
@ -1199,7 +1198,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set owner for " + src, safeMode);
}
@ -1253,9 +1251,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlocks getBlockLocations(String src, long offset, long length,
boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
throws FileNotFoundException, UnresolvedLinkException, IOException {
FSPermissionChecker pc = getPermissionChecker();
try {
return getBlockLocationsInt(pc, src, offset, length, doAccessTime,
return getBlockLocationsInt(src, offset, length, doAccessTime,
needBlockToken, checkSafeMode);
} catch (AccessControlException e) {
logAuditEvent(false, "open", src);
@ -1263,14 +1260,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
private LocatedBlocks getBlockLocationsInt(FSPermissionChecker pc,
String src, long offset, long length, boolean doAccessTime,
boolean needBlockToken, boolean checkSafeMode)
private LocatedBlocks getBlockLocationsInt(String src, long offset,
long length, boolean doAccessTime, boolean needBlockToken,
boolean checkSafeMode)
throws FileNotFoundException, UnresolvedLinkException, IOException {
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.READ);
}
if (offset < 0) {
throw new HadoopIllegalArgumentException(
"Negative offset is not supported. File: " + src);
@ -1298,13 +1291,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Get block locations within the specified range, updating the
* access times if necessary.
*/
private LocatedBlocks getBlockLocationsUpdateTimes(String src,
long offset,
long length,
boolean doAccessTime,
boolean needBlockToken)
throws FileNotFoundException, UnresolvedLinkException, IOException {
private LocatedBlocks getBlockLocationsUpdateTimes(String src, long offset,
long length, boolean doAccessTime, boolean needBlockToken)
throws FileNotFoundException,
UnresolvedLinkException, IOException {
FSPermissionChecker pc = getPermissionChecker();
for (int attempt = 0; attempt < 2; attempt++) {
boolean isReadOp = (attempt == 0);
if (isReadOp) { // first attempt is with readlock
@ -1320,6 +1311,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} else {
checkOperation(OperationCategory.WRITE);
}
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.READ);
}
// if the namenode is in safemode, then do not update access time
if (isInSafeMode()) {
@ -1362,6 +1356,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void concat(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
try {
concatInt(target, srcs);
} catch (AccessControlException e) {
@ -1372,11 +1370,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private void concatInt(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
// verify args
if(target.isEmpty()) {
throw new IllegalArgumentException("Target file name is empty");
@ -1525,6 +1518,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void setTimes(String src, long mtime, long atime)
throws IOException, UnresolvedLinkException {
if (!isAccessTimeSupported() && atime != -1) {
throw new IOException("Access time for hdfs is not configured. " +
" Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
}
try {
setTimesInt(src, mtime, atime);
} catch (AccessControlException e) {
@ -1535,16 +1532,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private void setTimesInt(String src, long mtime, long atime)
throws IOException, UnresolvedLinkException {
if (!isAccessTimeSupported() && atime != -1) {
throw new IOException("Access time for hdfs is not configured. " +
" Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
}
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set times " + src, safeMode);
}
// Write access is required to set access and modification times
if (isPermissionEnabled) {
@ -1569,6 +1565,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid file name: " + link);
}
try {
createSymlinkInt(target, link, dirPerms, createParent);
} catch (AccessControlException e) {
@ -1580,17 +1579,34 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private void createSymlinkInt(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
+ target + " link=" + link);
}
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create symlink " + link, safeMode);
}
if (!createParent) {
verifyParentDir(link);
}
createSymlinkInternal(pc, target, link, dirPerms, createParent);
if (!dir.isValidToCreate(link)) {
throw new IOException("failed to create link " + link
+" either because the filename is invalid or the file exists");
}
if (isPermissionEnabled) {
checkAncestorAccess(pc, link, FsAction.WRITE);
}
// validate that we have enough inodes.
checkFsObjectLimit();
// add symbolic link to namespace
dir.addSymlink(link, target, dirPerms, createParent);
resultingStat = getAuditFileInfo(link, false);
} finally {
writeUnlock();
@ -1599,37 +1615,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
logAuditEvent(true, "createSymlink", link, target, resultingStat);
}
/**
* Create a symbolic link.
*/
private void createSymlinkInternal(FSPermissionChecker pc, String target,
String link, PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" +
target + " link=" + link);
}
if (isInSafeMode()) {
throw new SafeModeException("Cannot create symlink " + link, safeMode);
}
if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid file name: " + link);
}
if (!dir.isValidToCreate(link)) {
throw new IOException("failed to create link " + link
+" either because the filename is invalid or the file exists");
}
if (isPermissionEnabled) {
checkAncestorAccess(pc, link, FsAction.WRITE);
}
// validate that we have enough inodes.
checkFsObjectLimit();
// add symbolic link to namespace
dir.addSymlink(link, target, dirPerms, createParent);
}
/**
* Set replication for an existing file.
*
@ -1747,12 +1732,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
short replication, long blockSize) throws AccessControlException,
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine
+ ", createParent=" + createParent
+ ", replication=" + replication
+ ", createFlag=" + flag.toString());
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
boolean skipSync = false;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
startFileInternal(pc, src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
} catch (StandbyException se) {
@ -1793,21 +1789,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine
+ ", createParent=" + createParent
+ ", replication=" + replication
+ ", createFlag=" + flag.toString());
}
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create file" + src, safeMode);
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
// Verify that the destination does not exist as a directory already.
boolean pathExists = dir.exists(src);
if (pathExists && dir.isDir(src)) {
@ -1942,21 +1927,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
boolean recoverLease(String src, String holder, String clientMachine)
throws IOException {
if (!DFSUtil.isValidName(src)) {
throw new IOException("Invalid file name: " + src);
}
boolean skipSync = false;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot recover the lease of " + src, safeMode);
}
if (!DFSUtil.isValidName(src)) {
throw new IOException("Invalid file name: " + src);
}
final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
if (!inode.isUnderConstruction()) {
return true;
@ -2080,13 +2064,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
"Append is not enabled on this NameNode. Use the " +
DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine);
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
LocatedBlock lb = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
lb = startFileInternal(pc, src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, 0);
@ -2378,21 +2369,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean abandonBlock(ExtendedBlock b, String src, String holder)
throws LeaseExpiredException, FileNotFoundException,
UnresolvedLinkException, IOException {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
+ "of file " + src);
}
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
//
// Remove the block from the pending creates list
//
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+b+"of file "+src);
}
if (isInSafeMode()) {
throw new SafeModeException("Cannot abandon block " + b +
" for fle" + src, safeMode);
}
//
// Remove the block from the pending creates list
//
INodeFileUnderConstruction file = checkLease(src, holder);
dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
if(NameNode.stateChangeLog.isDebugEnabled()) {
@ -2450,19 +2441,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
boolean completeFile(String src, String holder, ExtendedBlock last)
throws SafeModeException, UnresolvedLinkException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
checkBlock(last);
boolean success = false;
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
success = completeFileInternal(src, holder,
ExtendedBlock.getLocalBlock(last));
success = completeFileInternal(src, holder,
ExtendedBlock.getLocalBlock(last));
} finally {
writeUnlock();
}
getEditLog().logSync();
NameNode.stateChangeLog.info("DIR* completeFile: " + src + " is closed by "
+ holder);
return success;
}
@ -2470,10 +2465,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
String holder, Block last) throws SafeModeException,
UnresolvedLinkException, IOException {
assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot complete file " + src, safeMode);
}
@ -2509,9 +2501,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.info("DIR* completeFile: " + src + " is closed by "
+ holder);
return true;
}
@ -2613,18 +2602,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private boolean renameToInt(String src, String dst)
throws IOException, UnresolvedLinkException {
boolean status = false;
HdfsFileStatus resultingStat = null;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
" to " + dst);
}
if (!DFSUtil.isValidName(dst)) {
throw new IOException("Invalid name: " + dst);
}
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
boolean status = false;
HdfsFileStatus resultingStat = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
status = renameToInternal(pc, src, dst);
if (status) {
resultingStat = getAuditFileInfo(dst, false);
@ -2644,12 +2634,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
if (!DFSUtil.isValidName(dst)) {
throw new IOException("Invalid name: " + dst);
}
if (isPermissionEnabled) {
//We should not be doing this. This is move() not renameTo().
//but for now,
@ -2671,16 +2659,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Rename src to dst */
void renameTo(String src, String dst, Options.Rename... options)
throws IOException, UnresolvedLinkException {
HdfsFileStatus resultingStat = null;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ src + " to " + dst);
}
if (!DFSUtil.isValidName(dst)) {
throw new InvalidPathException("Invalid name: " + dst);
}
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
HdfsFileStatus resultingStat = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
renameToInternal(pc, src, dst, options);
resultingStat = getAuditFileInfo(dst, false);
} finally {
@ -2699,12 +2689,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private void renameToInternal(FSPermissionChecker pc, String src, String dst,
Options.Rename... options) throws IOException {
assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
if (!DFSUtil.isValidName(dst)) {
throw new InvalidPathException("Invalid name: " + dst);
}
if (isPermissionEnabled) {
checkParentAccess(pc, src, FsAction.WRITE);
checkAncestorAccess(pc, dst, FsAction.WRITE);
@ -2881,16 +2869,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus getFileInfo(String src, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException,
StandbyException, IOException {
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException("Invalid file name: " + src);
}
HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
readLock();
try {
checkOperation(OperationCategory.READ);
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException("Invalid file name: " + src);
}
if (isPermissionEnabled) {
checkTraverse(pc, src);
}
@ -2947,16 +2934,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private boolean mkdirsInt(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
HdfsFileStatus resultingStat = null;
boolean status = false;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
HdfsFileStatus resultingStat = null;
boolean status = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
status = mkdirsInternal(pc, src, permissions, createParent);
if (status) {
resultingStat = dir.getFileInfo(src, false);
@ -2978,6 +2967,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
PermissionStatus permissions, boolean createParent)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create directory " + src, safeMode);
}
@ -2989,9 +2979,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// a new directory is not created.
return true;
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
if (isPermissionEnabled) {
checkAncestorAccess(pc, src, FsAction.WRITE);
}
@ -3262,8 +3249,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages)
throws IOException, UnresolvedLinkException {
String src = "";
LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets)
+ ", closeFile=" + closeFile
+ ", deleteBlock=" + deleteblock
+ ")");
checkOperation(OperationCategory.WRITE);
String src = "";
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3275,13 +3269,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
"Cannot commitBlockSynchronization while in safe mode",
safeMode);
}
LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets)
+ ", closeFile=" + closeFile
+ ", deleteBlock=" + deleteblock
+ ")");
final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
.getLocalBlock(lastblock));
if (storedBlock == null) {
@ -3371,7 +3358,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
}
@ -4847,11 +4833,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
checkOperation(OperationCategory.WRITE);
NameNode.stateChangeLog.info("*DIR* reportBadBlocks");
writeLock();
try {
checkOperation(OperationCategory.WRITE);
NameNode.stateChangeLog.info("*DIR* reportBadBlocks");
for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
@ -4914,6 +4899,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
checkOperation(OperationCategory.WRITE);
LOG.info("updatePipeline(block=" + oldBlock
+ ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ ", newLength=" + newBlock.getNumBytes()
+ ", newNodes=" + Arrays.asList(newNodes)
+ ", clientName=" + clientName
+ ")");
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -4923,12 +4914,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
LOG.info("updatePipeline(block=" + oldBlock
+ ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ ", newLength=" + newBlock.getNumBytes()
+ ", newNodes=" + Arrays.asList(newNodes)
+ ", clientName=" + clientName
+ ")");
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
} finally {
writeUnlock();
@ -5180,7 +5165,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot issue delegation token", safeMode);
}

View File

@ -99,7 +99,7 @@ public class TestSafeMode {
*/
@Test
public void testManualSafeMode() throws IOException {
fs = (DistributedFileSystem)cluster.getFileSystem();
fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/testManualSafeMode/file1");
Path file2 = new Path("/tmp/testManualSafeMode/file2");
@ -112,7 +112,7 @@ public class TestSafeMode {
// now bring up just the NameNode.
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
cluster.waitActive();
dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs = cluster.getFileSystem();
assertTrue("No datanode is started. Should be in SafeMode",
dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
@ -322,11 +322,11 @@ public class TestSafeMode {
fs.rename(file1, new Path("file2"));
}});
try {
fs.setTimes(file1, 0, 0);
} catch (IOException ioe) {
fail("Set times failed while in SM");
}
runFsFun("Set time while in SM", new FSRun() {
@Override
public void run(FileSystem fs) throws IOException {
fs.setTimes(file1, 0, 0);
}});
try {
DFSTestUtil.readFile(fs, file1);
@ -350,7 +350,7 @@ public class TestSafeMode {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
cluster.restartNameNode();
fs = (DistributedFileSystem)cluster.getFileSystem();
fs = cluster.getFileSystem();
String tipMsg = cluster.getNamesystem().getSafemode();
assertTrue("Safemode tip message looks right: " + tipMsg,
@ -375,7 +375,7 @@ public class TestSafeMode {
* @throws IOException when there's an issue connecting to the test DFS.
*/
public void testSafeModeUtils() throws IOException {
dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs = cluster.getFileSystem();
// Enter safemode.
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);