diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5b20cd7cbb3..d0f39565056 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -150,6 +150,8 @@ Trunk (Unreleased) HDFS-4052. BlockManager#invalidateWork should print log outside the lock. (Jing Zhao via suresh) + HDFS-4110. Refine a log printed in JNStorage. (Liang Xie via suresh) + OPTIMIZATIONS BUG FIXES @@ -233,6 +235,9 @@ Trunk (Unreleased) HDFS-2434. TestNameNodeMetrics.testCorruptBlock fails intermittently. (Jing Zhao via suresh) + HDFS-4067. TestUnderReplicatedBlocks intermittently fails due to + ReplicaAlreadyExistsException. (Jing Zhao via suresh) + BREAKDOWN OF HDFS-3077 SUBTASKS HDFS-3077. Quorum-based protocol for reading and writing edit logs. @@ -413,6 +418,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4099. Clean up replication code and add more javadoc. (szetszwo) + HDFS-4107. Add utility methods for casting INode to INodeFile and + INodeFileUnderConstruction. (szetszwo) + OPTIMIZATIONS BUG FIXES @@ -488,6 +496,10 @@ Release 2.0.3-alpha - Unreleased HDFS-4022. Replication not happening for appended block. (Vinay via umamahesh) + HDFS-3948. Do not use hflush in TestWebHDFS.testNamenodeRestart() since the + out stream returned by WebHdfsFileSystem does not support it. (Jing Zhao + via szetszwo) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java index bb28e62ab03..b2f65debc57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java @@ -171,8 +171,7 @@ class JNStorage extends Storage { void format(NamespaceInfo nsInfo) throws IOException { setStorageInfo(nsInfo); - LOG.info("Formatting journal storage directory " + - sd + " with nsid: " + getNamespaceID()); + LOG.info("Formatting journal " + sd + " with nsid: " + getNamespaceID()); // Unlock the directory before formatting, because we will // re-analyze it after format(). The analyzeStorage() call // below is reponsible for re-locking it. This is a no-op diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 6c52a70b908..58e6c721885 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -1012,7 +1012,7 @@ public class FSDirectory implements Closeable { int i = 0; int totalBlocks = 0; for(String src : srcs) { - INodeFile srcInode = getFileINode(src); + INodeFile srcInode = (INodeFile)getINode(src); allSrcInodes[i++] = srcInode; totalBlocks += srcInode.blocks.length; } @@ -1300,25 +1300,13 @@ public class FSDirectory implements Closeable { } } - /** - * Get {@link INode} associated with the file. - */ - INodeFile getFileINode(String src) throws UnresolvedLinkException { - INode inode = getINode(src); - if (inode == null || inode.isDirectory()) - return null; - assert !inode.isLink(); - return (INodeFile) inode; - } - /** * Get {@link INode} associated with the file / directory. */ public INode getINode(String src) throws UnresolvedLinkException { readLock(); try { - INode iNode = rootDir.getNode(src, true); - return iNode; + return rootDir.getNode(src, true); } finally { readUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 945164bf3de..fbb1c271b30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -477,8 +477,8 @@ public class FSEditLogLoader { Lease lease = fsNamesys.leaseManager.getLease( reassignLeaseOp.leaseHolder); INodeFileUnderConstruction pendingFile = - (INodeFileUnderConstruction) fsDir.getFileINode( - reassignLeaseOp.path); + INodeFileUnderConstruction.valueOf( + fsDir.getINode(reassignLeaseOp.path), reassignLeaseOp.path); fsNamesys.reassignLeaseInternal(lease, reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile); break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index e1882d94814..0ec2ae00cc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -365,14 +365,7 @@ class FSImageFormat { // verify that file exists in namespace String path = cons.getLocalName(); - INode old = fsDir.getFileINode(path); - if (old == null) { - throw new IOException("Found lease for non-existent file " + path); - } - if (old.isDirectory()) { - throw new IOException("Found lease for directory " + path); - } - INodeFile oldnode = (INodeFile) old; + INodeFile oldnode = INodeFile.valueOf(fsDir.getINode(path), path); fsDir.replaceNode(path, oldnode, cons); namesystem.leaseManager.addLease(cons.getClientName(), path); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e0e2764d242..26af442f188 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1270,11 +1270,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } long now = now(); - INodeFile inode = dir.getFileINode(src); - if (inode == null) { - throw new FileNotFoundException("File does not exist: " + src); - } - assert !inode.isLink(); + final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src); if (doAccessTime && isAccessTimeSupported()) { if (now <= inode.getAccessTime() + getAccessTimePrecision()) { // if we have to set access time but we only have the readlock, then @@ -1390,28 +1386,27 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // we put the following prerequisite for the operation // replication and blocks sizes should be the same for ALL the blocks + // check the target - INode inode = dir.getFileINode(target); - - if(inode == null) { - throw new IllegalArgumentException("concat: trg file doesn't exist"); + final INodeFile trgInode = INodeFile.valueOf(dir.getINode(target), target); + if(trgInode.isUnderConstruction()) { + throw new HadoopIllegalArgumentException("concat: target file " + + target + " is under construction"); } - if(inode.isUnderConstruction()) { - throw new IllegalArgumentException("concat: trg file is uner construction"); - } - - INodeFile trgInode = (INodeFile) inode; - - // per design trg shouldn't be empty and all the blocks same size + // per design target shouldn't be empty and all the blocks same size if(trgInode.blocks.length == 0) { - throw new IllegalArgumentException("concat: "+ target + " file is empty"); + throw new HadoopIllegalArgumentException("concat: target file " + + target + " is empty"); } long blockSize = trgInode.getPreferredBlockSize(); // check the end block to be full if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) { - throw new IllegalArgumentException(target + " blocks size should be the same"); + throw new HadoopIllegalArgumentException("The last block in " + target + + " is not full; last block size = " + + trgInode.blocks[trgInode.blocks.length-1].getNumBytes() + + " but file block size = " + blockSize); } si.add(trgInode); @@ -1424,21 +1419,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if(i==srcs.length-1) endSrc=true; - INodeFile srcInode = dir.getFileINode(src); - + final INodeFile srcInode = INodeFile.valueOf(dir.getINode(src), src); if(src.isEmpty() - || srcInode == null || srcInode.isUnderConstruction() || srcInode.blocks.length == 0) { - throw new IllegalArgumentException("concat: file " + src + - " is invalid or empty or underConstruction"); + throw new HadoopIllegalArgumentException("concat: source file " + src + + " is invalid or empty or underConstruction"); } // check replication and blocks size if(repl != srcInode.getFileReplication()) { - throw new IllegalArgumentException(src + " and " + target + " " + - "should have same replication: " - + repl + " vs. " + srcInode.getFileReplication()); + throw new HadoopIllegalArgumentException("concat: the soruce file " + + src + " and the target file " + target + + " should have the same replication: source replication is " + + srcInode.getBlockReplication() + + " but target replication is " + repl); } //boolean endBlock=false; @@ -1448,8 +1443,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if(endSrc) idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) { - throw new IllegalArgumentException("concat: blocks sizes of " + - src + " and " + target + " should all be the same"); + throw new HadoopIllegalArgumentException("concat: the soruce file " + + src + " and the target file " + target + + " should have the same blocks sizes: target block size is " + + blockSize + " but the size of source block " + idx + " is " + + srcInode.blocks[idx].getNumBytes()); } si.add(srcInode); @@ -1458,7 +1456,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // make sure no two files are the same if(si.size() < srcs.length+1) { // trg + srcs // it means at least two files are the same - throw new IllegalArgumentException("at least two files are the same"); + throw new HadoopIllegalArgumentException( + "concat: at least two of the source files are the same"); } if(NameNode.stateChangeLog.isDebugEnabled()) { @@ -1797,13 +1796,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } try { - INodeFile myFile = dir.getFileINode(src); - try { - blockManager.verifyReplication(src, replication, clientMachine); - } catch(IOException e) { - throw new IOException("failed to create "+e.getMessage()); - } + blockManager.verifyReplication(src, replication, clientMachine); boolean create = flag.contains(CreateFlag.CREATE); + final INode myFile = dir.getINode(src); if (myFile == null) { if (!create) { throw new FileNotFoundException("failed to overwrite or append to non-existent file " @@ -1829,8 +1824,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); if (append && myFile != null) { + final INodeFile f = INodeFile.valueOf(myFile, src); return prepareFileForWrite( - src, myFile, holder, clientMachine, clientNode, true); + src, f, holder, clientMachine, clientNode, true); } else { // Now we can add the name to the filesystem. This file has no // blocks associated with it. @@ -1925,11 +1921,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new IOException("Invalid file name: " + src); } - INode inode = dir.getFileINode(src); - if (inode == null) { - throw new FileNotFoundException("File not found " + src); - } - + final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src); if (!inode.isUnderConstruction()) { return true; } @@ -2330,35 +2322,32 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private INodeFileUnderConstruction checkLease(String src, String holder) throws LeaseExpiredException, UnresolvedLinkException { assert hasReadOrWriteLock(); - INodeFile file = dir.getFileINode(src); - checkLease(src, holder, file); - return (INodeFileUnderConstruction)file; + return checkLease(src, holder, dir.getINode(src)); } - private void checkLease(String src, String holder, INode file) - throws LeaseExpiredException { + private INodeFileUnderConstruction checkLease(String src, String holder, + INode file) throws LeaseExpiredException { assert hasReadOrWriteLock(); - if (file == null || file.isDirectory()) { + if (file == null || !(file instanceof INodeFile)) { Lease lease = leaseManager.getLease(holder); - throw new LeaseExpiredException("No lease on " + src + - " File does not exist. " + - (lease != null ? lease.toString() : - "Holder " + holder + - " does not have any open files.")); + throw new LeaseExpiredException( + "No lease on " + src + ": File does not exist. " + + (lease != null ? lease.toString() + : "Holder " + holder + " does not have any open files.")); } if (!file.isUnderConstruction()) { Lease lease = leaseManager.getLease(holder); - throw new LeaseExpiredException("No lease on " + src + - " File is not open for writing. " + - (lease != null ? lease.toString() : - "Holder " + holder + - " does not have any open files.")); + throw new LeaseExpiredException( + "No lease on " + src + ": File is not open for writing. " + + (lease != null ? lease.toString() + : "Holder " + holder + " does not have any open files.")); } INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file; if (holder != null && !pendingFile.getClientName().equals(holder)) { throw new LeaseExpiredException("Lease mismatch on " + src + " owned by " + pendingFile.getClientName() + " but is accessed by " + holder); } + return pendingFile; } /** @@ -2400,15 +2389,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats, try { pendingFile = checkLease(src, holder); } catch (LeaseExpiredException lee) { - INodeFile file = dir.getFileINode(src); - if (file != null && !file.isUnderConstruction()) { + final INode inode = dir.getINode(src); + if (inode != null && inode instanceof INodeFile && !inode.isUnderConstruction()) { // This could be a retry RPC - i.e the client tried to close // the file, but missed the RPC response. Thus, it is trying // again to close the file. If the file still exists and // the client's view of the last block matches the actual // last block, then we'll treat it as a successful close. // See HDFS-3031. - Block realLastBlock = file.getLastBlock(); + final Block realLastBlock = ((INodeFile)inode).getLastBlock(); if (Block.matchingIdAndGenStamp(last, realLastBlock)) { NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: " + "received request from " + holder + " to complete file " + src + @@ -2994,23 +2983,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, LOG.info("Recovering lease=" + lease + ", src=" + src); assert !isInSafeMode(); assert hasWriteLock(); - INodeFile iFile = dir.getFileINode(src); - if (iFile == null) { - final String message = "DIR* NameSystem.internalReleaseLease: " - + "attempt to release a create lock on " - + src + " file does not exist."; - NameNode.stateChangeLog.warn(message); - throw new IOException(message); - } - if (!iFile.isUnderConstruction()) { - final String message = "DIR* NameSystem.internalReleaseLease: " - + "attempt to release a create lock on " - + src + " but file is already closed."; - NameNode.stateChangeLog.warn(message); - throw new IOException(message); - } - INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile; + final INodeFileUnderConstruction pendingFile + = INodeFileUnderConstruction.valueOf(dir.getINode(src), src); int nrBlocks = pendingFile.numBlocks(); BlockInfo[] blocks = pendingFile.getBlocks(); @@ -4318,17 +4293,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, try { for (Lease lease : leaseManager.getSortedLeases()) { for (String path : lease.getPaths()) { - INode node; + final INodeFileUnderConstruction cons; try { - node = dir.getFileINode(path); + cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path); } catch (UnresolvedLinkException e) { throw new AssertionError("Lease files should reside on this FS"); + } catch (IOException e) { + throw new RuntimeException(e); } - assert node != null : "Found a lease for nonexisting file."; - assert node.isUnderConstruction() : - "Found a lease for file " + path + " that is not under construction." + - " lease=" + lease; - INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node; BlockInfo[] blocks = cons.getBlocks(); if(blocks == null) continue; @@ -4911,21 +4883,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, for (Lease lease : leaseManager.getSortedLeases()) { for(String path : lease.getPaths()) { // verify that path exists in namespace - INode node; + final INodeFileUnderConstruction cons; try { - node = dir.getFileINode(path); + cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path); } catch (UnresolvedLinkException e) { throw new AssertionError("Lease files should reside on this FS"); } - if (node == null) { - throw new IOException("saveLeases found path " + path + - " but no matching entry in namespace."); - } - if (!node.isUnderConstruction()) { - throw new IOException("saveLeases found path " + path + - " but is not under construction."); - } - INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node; FSImageSerialization.writeINodeUnderConstruction(out, cons, path); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index e2f1e76146d..c84763f54c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; @@ -32,6 +33,17 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; /** I-node for closed file. */ @InterfaceAudience.Private public class INodeFile extends INode implements BlockCollection { + /** Cast INode to INodeFile. */ + public static INodeFile valueOf(INode inode, String path) throws IOException { + if (inode == null) { + throw new FileNotFoundException("File does not exist: " + path); + } + if (!(inode instanceof INodeFile)) { + throw new FileNotFoundException("Path is not a file: " + path); + } + return (INodeFile)inode; + } + static final FsPermission UMASK = FsPermission.createImmutable((short)0111); //Number of bits for Block size diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java index d3aac745fc4..89d0181ac2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import com.google.common.base.Joiner; @@ -35,6 +35,16 @@ import com.google.common.base.Joiner; */ @InterfaceAudience.Private class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection { + /** Cast INode to INodeFileUnderConstruction. */ + public static INodeFileUnderConstruction valueOf(INode inode, String path + ) throws IOException { + final INodeFile file = INodeFile.valueOf(inode, path); + if (!file.isUnderConstruction()) { + throw new IOException("File is not under construction: " + path); + } + return (INodeFileUnderConstruction)file; + } + private String clientName; // lease holder private final String clientMachine; private final DatanodeDescriptor clientNode; // if client is a cluster node too. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 88b7a2a13f3..b74e61f85b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -253,7 +253,7 @@ public class LeaseManager { private String findPath(INodeFileUnderConstruction pendingFile) { try { for (String src : paths) { - if (fsnamesystem.dir.getFileINode(src) == pendingFile) { + if (fsnamesystem.dir.getINode(src) == pendingFile) { return src; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 2e9deee05a6..dc77b251946 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -876,7 +876,14 @@ public class TestDFSClientRetries { new Random().nextBytes(bytes); out4.write(bytes); out4.write(bytes); - out4.hflush(); + if (isWebHDFS) { + // WebHDFS does not support hflush. To avoid DataNode communicating with + // NN while we're shutting down NN, we call out4.close() to finish + // writing the data + out4.close(); + } else { + out4.hflush(); + } //shutdown namenode assertTrue(HdfsUtils.isHealthy(uri)); @@ -889,10 +896,12 @@ public class TestDFSClientRetries { public void run() { try { //write some more data and then close the file - out4.write(bytes); - out4.write(bytes); - out4.write(bytes); - out4.close(); + if (!isWebHDFS) { + out4.write(bytes); + out4.write(bytes); + out4.write(bytes); + out4.close(); + } } catch (Exception e) { exceptions.add(e); } @@ -975,7 +984,11 @@ public class TestDFSClientRetries { Assert.assertEquals(String.format("count=%d", count), bytes[count % bytes.length], (byte)r); } - Assert.assertEquals(5 * bytes.length, count); + if (!isWebHDFS) { + Assert.assertEquals(5 * bytes.length, count); + } else { + Assert.assertEquals(2 * bytes.length, count); + } in.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 0aec4960554..ad262c48c86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -119,8 +119,6 @@ public class TestDistributedFileSystem { DFSTestUtil.createFile(fileSys, p, 1L, (short)1, 0L); DFSTestUtil.readFile(fileSys, p); - DFSClient client = ((DistributedFileSystem)fileSys).dfs; - fileSys.close(); } finally { @@ -476,7 +474,7 @@ public class TestDistributedFileSystem { fail("Expecting FileNotFoundException"); } catch (FileNotFoundException e) { assertTrue("Not throwing the intended exception message", e.getMessage() - .contains("File does not exist: /test/TestExistingDir")); + .contains("Path is not a file: /test/TestExistingDir")); } //hftp @@ -712,7 +710,6 @@ public class TestDistributedFileSystem { @Test public void testCreateWithCustomChecksum() throws Exception { Configuration conf = getTestConfiguration(); - final long grace = 1000L; MiniDFSCluster cluster = null; Path testBasePath = new Path("/test/csum"); // create args diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java index 2a4f44ea04d..0b4d8a6e231 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.junit.Test; public class TestUnderReplicatedBlocks { @@ -49,6 +50,12 @@ public class TestUnderReplicatedBlocks { ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH); DatanodeDescriptor dn = bm.blocksMap.nodeIterator(b.getLocalBlock()).next(); bm.addToInvalidates(b.getLocalBlock(), dn); + // Compute the invalidate work in NN, and trigger the heartbeat from DN + BlockManagerTestUtil.computeAllPendingWork(bm); + DataNodeTestUtils.triggerHeartbeat(cluster.getDataNode(dn.getIpcPort())); + // Wait to make sure the DataNode receives the deletion request + Thread.sleep(1000); + // Remove the record from blocksMap bm.blocksMap.removeNode(b.getLocalBlock(), dn); // increment this file's replication factor diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java index 66e60b02718..27490a2e8d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java @@ -83,8 +83,7 @@ public class TestBlockUnderConstruction { private void verifyFileBlocks(String file, boolean isFileOpen) throws IOException { FSNamesystem ns = cluster.getNamesystem(); - INodeFile inode = ns.dir.getFileINode(file); - assertTrue("File does not exist: " + inode.toString(), inode != null); + final INodeFile inode = INodeFile.valueOf(ns.dir.getINode(file), file); assertTrue("File " + inode.toString() + " isUnderConstruction = " + inode.isUnderConstruction() + " expected to be " + isFileOpen, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java index 725f387d316..53bff16957f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java @@ -18,13 +18,17 @@ package org.apache.hadoop.hdfs.server.namenode; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.FileNotFoundException; +import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; - import org.junit.Test; public class TestINodeFile { @@ -199,4 +203,88 @@ public class TestINodeFile { return iNodes; } + + /** + * Test for the static {@link INodeFile#valueOf(INode, String)} + * and {@link INodeFileUnderConstruction#valueOf(INode, String)} methods. + * @throws IOException + */ + @Test + public void testValueOf () throws IOException { + final String path = "/testValueOf"; + final PermissionStatus perm = new PermissionStatus( + userName, null, FsPermission.getDefault()); + final short replication = 3; + + {//cast from null + final INode from = null; + + //cast to INodeFile, should fail + try { + INodeFile.valueOf(from, path); + fail(); + } catch(FileNotFoundException fnfe) { + assertTrue(fnfe.getMessage().contains("File does not exist")); + } + + //cast to INodeFileUnderConstruction, should fail + try { + INodeFileUnderConstruction.valueOf(from, path); + fail(); + } catch(FileNotFoundException fnfe) { + assertTrue(fnfe.getMessage().contains("File does not exist")); + } + } + + {//cast from INodeFile + final INode from = new INodeFile( + perm, null, replication, 0L, 0L, preferredBlockSize); + + //cast to INodeFile, should success + final INodeFile f = INodeFile.valueOf(from, path); + assertTrue(f == from); + + //cast to INodeFileUnderConstruction, should fail + try { + INodeFileUnderConstruction.valueOf(from, path); + fail(); + } catch(IOException ioe) { + assertTrue(ioe.getMessage().contains("File is not under construction")); + } + } + + {//cast from INodeFileUnderConstruction + final INode from = new INodeFileUnderConstruction( + perm, replication, 0L, 0L, "client", "machine", null); + + //cast to INodeFile, should success + final INodeFile f = INodeFile.valueOf(from, path); + assertTrue(f == from); + + //cast to INodeFileUnderConstruction, should success + final INodeFileUnderConstruction u = INodeFileUnderConstruction.valueOf( + from, path); + assertTrue(u == from); + } + + {//cast from INodeDirectory + final INode from = new INodeDirectory(perm, 0L); + + //cast to INodeFile, should fail + try { + INodeFile.valueOf(from, path); + fail(); + } catch(FileNotFoundException fnfe) { + assertTrue(fnfe.getMessage().contains("Path is not a file")); + } + + //cast to INodeFileUnderConstruction, should fail + try { + INodeFileUnderConstruction.valueOf(from, path); + fail(); + } catch(FileNotFoundException fnfe) { + assertTrue(fnfe.getMessage().contains("Path is not a file")); + } + } + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9b7828d6569..088ac50f72c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -610,6 +610,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. (Vinod Kumar Vavilapalli via jlowe) + MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion + requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java index fd80ec2b1e9..acc85645f47 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java @@ -27,10 +27,8 @@ import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.TaskAttemptID; -@SuppressWarnings("deprecation") class EventFetcher extends Thread { private static final long SLEEP_TIME = 1000; - private static final int MAX_EVENTS_TO_FETCH = 10000; private static final int MAX_RETRIES = 10; private static final int RETRY_PERIOD = 5000; private static final Log LOG = LogFactory.getLog(EventFetcher.class); @@ -38,7 +36,8 @@ class EventFetcher extends Thread { private final TaskAttemptID reduce; private final TaskUmbilicalProtocol umbilical; private final ShuffleScheduler scheduler; - private int fromEventId = 0; + private int fromEventIdx = 0; + private int maxEventsToFetch; private ExceptionReporter exceptionReporter = null; private int maxMapRuntime = 0; @@ -48,13 +47,15 @@ class EventFetcher extends Thread { public EventFetcher(TaskAttemptID reduce, TaskUmbilicalProtocol umbilical, ShuffleScheduler scheduler, - ExceptionReporter reporter) { + ExceptionReporter reporter, + int maxEventsToFetch) { setName("EventFetcher for fetching Map Completion Events"); setDaemon(true); this.reduce = reduce; this.umbilical = umbilical; this.scheduler = scheduler; exceptionReporter = reporter; + this.maxEventsToFetch = maxEventsToFetch; } @Override @@ -112,46 +113,47 @@ class EventFetcher extends Thread { * from a given event ID. * @throws IOException */ - private int getMapCompletionEvents() throws IOException { + protected int getMapCompletionEvents() throws IOException { int numNewMaps = 0; - - MapTaskCompletionEventsUpdate update = - umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID) - reduce.getJobID(), - fromEventId, - MAX_EVENTS_TO_FETCH, - (org.apache.hadoop.mapred.TaskAttemptID) - reduce); - TaskCompletionEvent events[] = update.getMapTaskCompletionEvents(); - LOG.debug("Got " + events.length + " map completion events from " + - fromEventId); - - // Check if the reset is required. - // Since there is no ordering of the task completion events at the - // reducer, the only option to sync with the new jobtracker is to reset - // the events index - if (update.shouldReset()) { - fromEventId = 0; - scheduler.resetKnownMaps(); - } - - // Update the last seen event ID - fromEventId += events.length; - - // Process the TaskCompletionEvents: - // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. - // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop - // fetching from those maps. - // 3. Remove TIPFAILED maps from neededOutputs since we don't need their - // outputs at all. - for (TaskCompletionEvent event : events) { - switch (event.getTaskStatus()) { + TaskCompletionEvent events[] = null; + + do { + MapTaskCompletionEventsUpdate update = + umbilical.getMapCompletionEvents( + (org.apache.hadoop.mapred.JobID)reduce.getJobID(), + fromEventIdx, + maxEventsToFetch, + (org.apache.hadoop.mapred.TaskAttemptID)reduce); + events = update.getMapTaskCompletionEvents(); + LOG.debug("Got " + events.length + " map completion events from " + + fromEventIdx); + + // Check if the reset is required. + // Since there is no ordering of the task completion events at the + // reducer, the only option to sync with the new jobtracker is to reset + // the events index + if (update.shouldReset()) { + fromEventIdx = 0; + scheduler.resetKnownMaps(); + } + + // Update the last seen event ID + fromEventIdx += events.length; + + // Process the TaskCompletionEvents: + // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. + // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop + // fetching from those maps. + // 3. Remove TIPFAILED maps from neededOutputs since we don't need their + // outputs at all. + for (TaskCompletionEvent event : events) { + switch (event.getTaskStatus()) { case SUCCEEDED: URI u = getBaseURI(event.getTaskTrackerHttp()); scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(), - u.toString(), - event.getTaskAttemptId()); + u.toString(), + event.getTaskAttemptId()); numNewMaps ++; int duration = event.getTaskRunTime(); if (duration > maxMapRuntime) { @@ -164,15 +166,17 @@ class EventFetcher extends Thread { case OBSOLETE: scheduler.obsoleteMapOutput(event.getTaskAttemptId()); LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + - " map-task: '" + event.getTaskAttemptId() + "'"); + " map-task: '" + event.getTaskAttemptId() + "'"); break; case TIPFAILED: scheduler.tipFailed(event.getTaskAttemptId().getTaskID()); LOG.info("Ignoring output of failed map TIP: '" + - event.getTaskAttemptId() + "'"); + event.getTaskAttemptId() + "'"); break; + } } - } + } while (events.length == maxEventsToFetch); + return numNewMaps; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java index e7d7d71d079..ceada74f793 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java @@ -40,9 +40,12 @@ import org.apache.hadoop.util.Progress; @InterfaceAudience.Private @InterfaceStability.Unstable -@SuppressWarnings({"deprecation", "unchecked", "rawtypes"}) +@SuppressWarnings({"unchecked", "rawtypes"}) public class Shuffle implements ExceptionReporter { private static final int PROGRESS_FREQUENCY = 2000; + private static final int MAX_EVENTS_TO_FETCH = 10000; + private static final int MIN_EVENTS_TO_FETCH = 100; + private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000; private final TaskAttemptID reduceId; private final JobConf jobConf; @@ -99,9 +102,17 @@ public class Shuffle implements ExceptionReporter { } public RawKeyValueIterator run() throws IOException, InterruptedException { + // Scale the maximum events we fetch per RPC call to mitigate OOM issues + // on the ApplicationMaster when a thundering herd of reducers fetch events + // TODO: This should not be necessary after HADOOP-8942 + int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, + MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks()); + int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer); + // Start the map-completion events fetcher thread final EventFetcher eventFetcher = - new EventFetcher(reduceId, umbilical, scheduler, this); + new EventFetcher(reduceId, umbilical, scheduler, this, + maxEventsToFetch); eventFetcher.start(); // Start the map-output fetcher threads diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java new file mode 100644 index 00000000000..84ac656cf9b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskCompletionEvent; +import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapreduce.TaskType; +import org.junit.Test; +import org.mockito.InOrder; + +public class TestEventFetcher { + + @Test + public void testConsecutiveFetch() throws IOException { + final int MAX_EVENTS_TO_FETCH = 100; + TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1); + + TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class); + when(umbilical.getMapCompletionEvents(any(JobID.class), + anyInt(), anyInt(), any(TaskAttemptID.class))) + .thenReturn(getMockedCompletionEventsUpdate(0, 0)); + when(umbilical.getMapCompletionEvents(any(JobID.class), + eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid))) + .thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH)); + when(umbilical.getMapCompletionEvents(any(JobID.class), + eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid))) + .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH, + MAX_EVENTS_TO_FETCH)); + when(umbilical.getMapCompletionEvents(any(JobID.class), + eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid))) + .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3)); + + @SuppressWarnings("unchecked") + ShuffleScheduler scheduler = mock(ShuffleScheduler.class); + ExceptionReporter reporter = mock(ExceptionReporter.class); + + EventFetcherForTest ef = + new EventFetcherForTest(tid, umbilical, scheduler, + reporter, MAX_EVENTS_TO_FETCH); + ef.getMapCompletionEvents(); + + verify(reporter, never()).reportException(any(Throwable.class)); + InOrder inOrder = inOrder(umbilical); + inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class), + eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)); + inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class), + eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)); + inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class), + eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid)); + verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).addKnownMapOutput( + anyString(), anyString(), any(TaskAttemptID.class)); + } + + private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate( + int startIdx, int numEvents) { + ArrayList tceList = + new ArrayList(numEvents); + for (int i = 0; i < numEvents; ++i) { + int eventIdx = startIdx + i; + TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx, + new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0), + eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED, + "http://somehost:8888"); + tceList.add(tce); + } + TaskCompletionEvent[] events = {}; + return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false); + } + + private static class EventFetcherForTest extends EventFetcher { + + public EventFetcherForTest(TaskAttemptID reduce, + TaskUmbilicalProtocol umbilical, ShuffleScheduler scheduler, + ExceptionReporter reporter, int maxEventsToFetch) { + super(reduce, umbilical, scheduler, reporter, maxEventsToFetch); + } + + @Override + public int getMapCompletionEvents() throws IOException { + return super.getMapCompletionEvents(); + } + + } +}