diff --git a/hadoop-client/pom.xml b/hadoop-client/pom.xml index 1e2c7367b2f..9884480e341 100644 --- a/hadoop-client/pom.xml +++ b/hadoop-client/pom.xml @@ -39,10 +39,6 @@ hadoop-common compile - - commons-httpclient - commons-httpclient - tomcat jasper-compiler diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 85000b639f0..83c8c4701b2 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -363,9 +363,6 @@ Release 2.3.0 - UNRELEASED HADOOP-9908. Fix NPE when versioninfo properties file is missing (todd) - HADOOP-9350. Hadoop not building against Java7 on OSX - (Robert Kanter via stevel) - Release 2.1.1-beta - UNRELEASED INCOMPATIBLE CHANGES @@ -408,6 +405,9 @@ Release 2.1.1-beta - UNRELEASED HADOOP-9918. Add addIfService to CompositeService (Karthik Kambatla via Sandy Ryza) + HADOOP-9945. HAServiceState should have a state for stopped services. + (Karthik Kambatla via atm) + OPTIMIZATIONS BUG FIXES @@ -460,6 +460,14 @@ Release 2.1.1-beta - UNRELEASED HADOOP-9958. Add old constructor back to DelegationTokenInformation to unbreak downstream builds. (Andrew Wang) + HADOOP-9960. Upgrade Jersey version to 1.9. (Karthik Kambatla via atm) + + HADOOP-9557. hadoop-client excludes commons-httpclient. (Lohit Vijayarenu via + cnauroth) + + HADOOP-9350. Hadoop not building against Java7 on OSX + (Robert Kanter via stevel) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java index 57eee187b0e..7de12c78b4f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java @@ -43,13 +43,15 @@ public interface HAServiceProtocol { public static final long versionID = 1L; /** - * An HA service may be in active or standby state. During - * startup, it is in an unknown INITIALIZING state. + * An HA service may be in active or standby state. During startup, it is in + * an unknown INITIALIZING state. During shutdown, it is in the STOPPING state + * and can no longer return to active/standby states. */ public enum HAServiceState { INITIALIZING("initializing"), ACTIVE("active"), - STANDBY("standby"); + STANDBY("standby"), + STOPPING("stopping"); private String name; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java index c8509eff6ac..2cf6216ef13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java @@ -39,6 +39,12 @@ import org.jboss.netty.channel.Channel; public class Nfs3Utils { public final static String INODEID_PATH_PREFIX = "/.reserved/.inodes/"; + + public final static String READ_RPC_START = "READ_RPC_CALL_START____"; + public final static String READ_RPC_END = "READ_RPC_CALL_END______"; + public final static String WRITE_RPC_START = "WRITE_RPC_CALL_START____"; + public final static String WRITE_RPC_END = "WRITE_RPC_CALL_END______"; + public static String getFileIdPath(FileHandle handle) { return getFileIdPath(handle.getFileId()); } @@ -102,7 +108,10 @@ public class Nfs3Utils { /** * Send a write response to the netty network socket channel */ - public static void writeChannel(Channel channel, XDR out) { + public static void writeChannel(Channel channel, XDR out, int xid) { + if (RpcProgramNfs3.LOG.isDebugEnabled()) { + RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid); + } ChannelBuffer outBuf = XDR.writeMessageTcp(out, true); channel.write(outBuf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 12de05e058e..677a1c73382 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -291,7 +291,7 @@ class OpenFileCtx { WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); } else { // Handle repeated write requests(same xid or not). // If already replied, send reply again. If not replied, drop the @@ -313,7 +313,7 @@ class OpenFileCtx { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, request.getCount(), request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); } updateLastAccessTime(); @@ -367,7 +367,7 @@ class OpenFileCtx { WccData fileWcc = new WccData(preOpAttr, postOpAttr); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); writeCtx.setReplied(true); } @@ -392,7 +392,7 @@ class OpenFileCtx { WccData fileWcc = new WccData(preOpAttr, postOpAttr); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); writeCtx.setReplied(true); } @@ -418,7 +418,7 @@ class OpenFileCtx { } updateLastAccessTime(); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); } } @@ -707,7 +707,7 @@ class OpenFileCtx { WccData fileWcc = new WccData(preOpAttr, latestAttr); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); } } catch (IOException e) { @@ -715,7 +715,7 @@ class OpenFileCtx { + offset + " and length " + data.length, e); if (!writeCtx.getReplied()) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); // Keep stream open. Either client retries or SteamMonitor closes it. } @@ -753,7 +753,7 @@ class OpenFileCtx { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); Nfs3Utils.writeChannel(writeCtx.getChannel(), - response.send(new XDR(), writeCtx.getXid())); + response.send(new XDR(), writeCtx.getXid()), writeCtx.getXid()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index b6a9b988097..576bee1dfe8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -125,7 +125,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { public static final FsPermission umask = new FsPermission( (short) DEFAULT_UMASK); - private static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class); + static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class); private static final int MAX_READ_TRANSFER_SIZE = 64 * 1024; private static final int MAX_WRITE_TRANSFER_SIZE = 64 * 1024; private static final int MAX_READDIR_TRANSFER_SIZE = 64 * 1024; @@ -1814,9 +1814,19 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } else if (nfsproc3 == NFSPROC3.READLINK) { response = readlink(xdr, securityHandler, client); } else if (nfsproc3 == NFSPROC3.READ) { + if (LOG.isDebugEnabled()) { + LOG.debug(Nfs3Utils.READ_RPC_START + xid); + } response = read(xdr, securityHandler, client); + if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) { + LOG.debug(Nfs3Utils.READ_RPC_END + xid); + } } else if (nfsproc3 == NFSPROC3.WRITE) { + if (LOG.isDebugEnabled()) { + LOG.debug(Nfs3Utils.WRITE_RPC_START + xid); + } response = write(xdr, channel, xid, securityHandler, client); + // Write end debug trace is in Nfs3Utils.writeChannel } else if (nfsproc3 == NFSPROC3.CREATE) { response = create(xdr, securityHandler, client); } else if (nfsproc3 == NFSPROC3.MKDIR) { @@ -1853,6 +1863,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { if (response != null) { out = response.send(out, xid); } + return out; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 70e9bc396c9..f8f16a9d495 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -118,7 +118,7 @@ public class WriteManager { byte[] data = request.getData().array(); if (data.length < count) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); return; } @@ -155,7 +155,7 @@ public class WriteManager { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, count, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); return; } @@ -182,10 +182,10 @@ public class WriteManager { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, count, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); } else { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java index 3a7ec5abc84..ebce6569b8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java @@ -174,11 +174,11 @@ public class TestOutOfOrderWrite { XDR writeReq; writeReq = write(handle, 0x8000005c, 2000, 1000, data3); - Nfs3Utils.writeChannel(channel, writeReq); + Nfs3Utils.writeChannel(channel, writeReq, 1); writeReq = write(handle, 0x8000005d, 1000, 1000, data2); - Nfs3Utils.writeChannel(channel, writeReq); + Nfs3Utils.writeChannel(channel, writeReq, 2); writeReq = write(handle, 0x8000005e, 0, 1000, data1); - Nfs3Utils.writeChannel(channel, writeReq); + Nfs3Utils.writeChannel(channel, writeReq, 3); // TODO: convert to Junit test, and validate result automatically } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7dd9a126d75..912055ec191 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -275,6 +275,10 @@ Release 2.3.0 - UNRELEASED HDFS-4096. Add snapshot information to namenode WebUI. (Haohui Mai via jing9) + HDFS-5188. In BlockPlacementPolicy, reduce the number of chooseTarget(..) + methods; replace HashMap with Map in parameter declarations and cleanup + some related code. (szetszwo) + OPTIMIZATIONS BUG FIXES @@ -338,6 +342,8 @@ Release 2.1.1-beta - UNRELEASED HDFS-5067 Support symlink operations in NFS gateway. (brandonli) + HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli) + IMPROVEMENTS HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a29121248c1..86f2be4f8ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; /** * This class contains constants for configuration keys used @@ -348,6 +349,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime"; public static final long DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L; + public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname"; + public static final Class DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final int DFS_REPLICATION_MAX_DEFAULT = 512; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a8024f412f0..57a5f9585da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1260,8 +1260,7 @@ public class BlockManager { namesystem.writeUnlock(); } - HashMap excludedNodes - = new HashMap(); + final Map excludedNodes = new HashMap(); for(ReplicationWork rw : work){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. @@ -1273,9 +1272,7 @@ public class BlockManager { // choose replication targets: NOT HOLDING THE GLOBAL LOCK // It is costly to extract the filename for which chooseTargets is called, // so for now we pass in the block collection itself. - rw.targets = blockplacement.chooseTarget(rw.bc, - rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes, - excludedNodes, rw.block.getNumBytes()); + rw.chooseTargets(blockplacement, excludedNodes); } namesystem.writeLock(); @@ -3249,6 +3246,13 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block this.priority = priority; this.targets = null; } + + private void chooseTargets(BlockPlacementPolicy blockplacement, + Map excludedNodes) { + targets = blockplacement.chooseTarget(bc.getName(), + additionalReplRequired, srcNode, liveReplicaNodes, false, + excludedNodes, block.getNumBytes()); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 36a0b2a6c86..71d89d52b7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -51,25 +51,6 @@ public abstract class BlockPlacementPolicy { } } - /** - * choose numOfReplicas data nodes for writer - * to re-replicate a block with size blocksize - * If not, return as many as we can. - * - * @param srcPath the file to which this chooseTargets is being invoked. - * @param numOfReplicas additional number of replicas wanted. - * @param writer the writer's machine, null if not in the cluster. - * @param chosenNodes datanodes that have been chosen as targets. - * @param blocksize size of the data to be written. - * @return array of DatanodeDescriptor instances chosen as target - * and sorted as a pipeline. - */ - abstract DatanodeDescriptor[] chooseTarget(String srcPath, - int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - long blocksize); - /** * choose numOfReplicas data nodes for writer * to re-replicate a block with size blocksize @@ -90,34 +71,8 @@ public abstract class BlockPlacementPolicy { DatanodeDescriptor writer, List chosenNodes, boolean returnChosenNodes, - HashMap excludedNodes, + Map excludedNodes, long blocksize); - - /** - * choose numOfReplicas data nodes for writer - * If not, return as many as we can. - * The base implemenatation extracts the pathname of the file from the - * specified srcBC, but this could be a costly operation depending on the - * file system implementation. Concrete implementations of this class should - * override this method to avoid this overhead. - * - * @param srcBC block collection of file for which chooseTarget is invoked. - * @param numOfReplicas additional number of replicas wanted. - * @param writer the writer's machine, null if not in the cluster. - * @param chosenNodes datanodes that have been chosen as targets. - * @param blocksize size of the data to be written. - * @return array of DatanodeDescriptor instances chosen as target - * and sorted as a pipeline. - */ - DatanodeDescriptor[] chooseTarget(BlockCollection srcBC, - int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - HashMap excludedNodes, - long blocksize) { - return chooseTarget(srcBC.getName(), numOfReplicas, writer, - chosenNodes, false, excludedNodes, blocksize); - } /** * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, @@ -128,7 +83,7 @@ public abstract class BlockPlacementPolicy { */ DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas, DatanodeDescriptor writer, - HashMap excludedNodes, + Map excludedNodes, long blocksize, List favoredNodes) { // This class does not provide the functionality of placing // a block in favored datanodes. The implementations of this class @@ -183,7 +138,7 @@ public abstract class BlockPlacementPolicy { /** * Get an instance of the configured Block Placement Policy based on the - * value of the configuration paramater dfs.block.replicator.classname. + * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}. * * @param conf the configuration to be used * @param stats an object that is used to retrieve the load on the cluster @@ -193,12 +148,12 @@ public abstract class BlockPlacementPolicy { public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { - Class replicatorClass = - conf.getClass("dfs.block.replicator.classname", - BlockPlacementPolicyDefault.class, - BlockPlacementPolicy.class); - BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance( - replicatorClass, conf); + final Class replicatorClass = conf.getClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, + BlockPlacementPolicy.class); + final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( + replicatorClass, conf); replicator.initialize(conf, stats, clusterMap); return replicator; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index fbb922351bf..8fab427907c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -22,8 +22,8 @@ import static org.apache.hadoop.util.Time.now; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -57,6 +57,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { "For more information, please enable DEBUG log level on " + BlockPlacementPolicy.class.getName(); + private static final ThreadLocal debugLoggingBuilder + = new ThreadLocal() { + @Override + protected StringBuilder initialValue() { + return new StringBuilder(); + } + }; + protected boolean considerLoad; private boolean preferLocalNode = true; protected NetworkTopology clusterMap; @@ -95,40 +103,25 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); } - protected ThreadLocal threadLocalBuilder = - new ThreadLocal() { - @Override - protected StringBuilder initialValue() { - return new StringBuilder(); - } - }; - - @Override - public DatanodeDescriptor[] chooseTarget(String srcPath, - int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - long blocksize) { - return chooseTarget(numOfReplicas, writer, chosenNodes, false, - null, blocksize); - } - @Override public DatanodeDescriptor[] chooseTarget(String srcPath, int numOfReplicas, DatanodeDescriptor writer, List chosenNodes, boolean returnChosenNodes, - HashMap excludedNodes, + Map excludedNodes, long blocksize) { return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, excludedNodes, blocksize); } @Override - DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas, - DatanodeDescriptor writer, HashMap excludedNodes, - long blocksize, List favoredNodes) { + DatanodeDescriptor[] chooseTarget(String src, + int numOfReplicas, + DatanodeDescriptor writer, + Map excludedNodes, + long blocksize, + List favoredNodes) { try { if (favoredNodes == null || favoredNodes.size() == 0) { // Favored nodes not specified, fall back to regular block placement. @@ -137,7 +130,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { excludedNodes, blocksize); } - HashMap favoriteAndExcludedNodes = excludedNodes == null ? + Map favoriteAndExcludedNodes = excludedNodes == null ? new HashMap() : new HashMap(excludedNodes); // Choose favored nodes @@ -181,14 +174,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } /** This is the implementation. */ - DatanodeDescriptor[] chooseTarget(int numOfReplicas, + private DatanodeDescriptor[] chooseTarget(int numOfReplicas, DatanodeDescriptor writer, List chosenNodes, boolean returnChosenNodes, - HashMap excludedNodes, + Map excludedNodes, long blocksize) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { - return new DatanodeDescriptor[0]; + return DatanodeDescriptor.EMPTY_ARRAY; } if (excludedNodes == null) { @@ -204,7 +197,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { for (DatanodeDescriptor node:chosenNodes) { // add localMachine and related nodes to excludedNodes addToExcludedNodes(node, excludedNodes); - adjustExcludedNodes(excludedNodes, node); } if (!clusterMap.contains(writer)) { @@ -239,7 +231,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { /* choose numOfReplicas from all data nodes */ private DatanodeDescriptor chooseTarget(int numOfReplicas, DatanodeDescriptor writer, - HashMap excludedNodes, + Map excludedNodes, long blocksize, int maxNodesPerRack, List results, @@ -256,7 +248,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } // Keep a copy of original excludedNodes - final HashMap oldExcludedNodes = avoidStaleNodes ? + final Map oldExcludedNodes = avoidStaleNodes ? new HashMap(excludedNodes) : null; try { if (numOfResults == 0) { @@ -316,19 +308,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return writer; } - /* choose localMachine as the target. + /** + * Choose localMachine as the target. * if localMachine is not available, * choose a node on the same rack * @return the chosen node */ - protected DatanodeDescriptor chooseLocalNode( - DatanodeDescriptor localMachine, - HashMap excludedNodes, + protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, + Map excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) - throws NotEnoughReplicasException { + throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, @@ -337,11 +329,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // otherwise try local machine first Node oldNode = excludedNodes.put(localMachine, localMachine); if (oldNode == null) { // was not in the excluded list - if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false, - results, avoidStaleNodes)) { - results.add(localMachine); - // add localMachine and related nodes to excludedNode - addToExcludedNodes(localMachine, excludedNodes); + if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize, + maxNodesPerRack, false, results, avoidStaleNodes) >= 0) { return localMachine; } } @@ -358,26 +347,26 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * @return number of new excluded nodes */ protected int addToExcludedNodes(DatanodeDescriptor localMachine, - HashMap excludedNodes) { + Map excludedNodes) { Node node = excludedNodes.put(localMachine, localMachine); return node == null?1:0; } - /* choose one node from the rack that localMachine is on. + /** + * Choose one node from the rack that localMachine is on. * if no such node is available, choose one node from the rack where * a second replica is on. * if still no such node is available, choose a random node * in the cluster. * @return the chosen node */ - protected DatanodeDescriptor chooseLocalRack( - DatanodeDescriptor localMachine, - HashMap excludedNodes, + protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, + Map excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) - throws NotEnoughReplicasException { + throws NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, @@ -391,9 +380,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; - for(Iterator iter=results.iterator(); - iter.hasNext();) { - DatanodeDescriptor nextNode = iter.next(); + for(DatanodeDescriptor nextNode : results) { if (nextNode != localMachine) { newLocal = nextNode; break; @@ -416,7 +403,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } - /* choose numOfReplicas nodes from the racks + /** + * Choose numOfReplicas nodes from the racks * that localMachine is NOT on. * if not enough nodes are available, choose the remaining ones * from the local rack @@ -424,12 +412,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { protected void chooseRemoteRack(int numOfReplicas, DatanodeDescriptor localMachine, - HashMap excludedNodes, + Map excludedNodes, long blocksize, int maxReplicasPerRack, List results, boolean avoidStaleNodes) - throws NotEnoughReplicasException { + throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { @@ -443,91 +431,59 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } - /* Randomly choose one target from nodes. - * @return the chosen node + /** + * Randomly choose one target from the given scope. + * @return the chosen node, if there is any. */ - protected DatanodeDescriptor chooseRandom( - String nodes, - HashMap excludedNodes, - long blocksize, - int maxNodesPerRack, - List results, - boolean avoidStaleNodes) - throws NotEnoughReplicasException { - int numOfAvailableNodes = - clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet()); - StringBuilder builder = null; - if (LOG.isDebugEnabled()) { - builder = threadLocalBuilder.get(); - builder.setLength(0); - builder.append("["); - } - boolean badTarget = false; - while(numOfAvailableNodes > 0) { - DatanodeDescriptor chosenNode = - (DatanodeDescriptor)(clusterMap.chooseRandom(nodes)); - - Node oldNode = excludedNodes.put(chosenNode, chosenNode); - if (oldNode == null) { // chosenNode was not in the excluded list - numOfAvailableNodes--; - if (isGoodTarget(chosenNode, blocksize, - maxNodesPerRack, results, avoidStaleNodes)) { - results.add(chosenNode); - // add chosenNode and related nodes to excludedNode - addToExcludedNodes(chosenNode, excludedNodes); - adjustExcludedNodes(excludedNodes, chosenNode); - return chosenNode; - } else { - badTarget = true; - } - } - } - - String detail = enableDebugLogging; - if (LOG.isDebugEnabled()) { - if (badTarget && builder != null) { - detail = builder.append("]").toString(); - builder.setLength(0); - } else detail = ""; - } - throw new NotEnoughReplicasException(detail); + protected DatanodeDescriptor chooseRandom(String scope, + Map excludedNodes, + long blocksize, + int maxNodesPerRack, + List results, + boolean avoidStaleNodes) + throws NotEnoughReplicasException { + return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack, + results, avoidStaleNodes); } - - /* Randomly choose numOfReplicas targets from nodes. + + /** + * Randomly choose numOfReplicas targets from the given scope. + * @return the first chosen node, if there is any. */ - protected void chooseRandom(int numOfReplicas, - String nodes, - HashMap excludedNodes, + protected DatanodeDescriptor chooseRandom(int numOfReplicas, + String scope, + Map excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) - throws NotEnoughReplicasException { + throws NotEnoughReplicasException { - int numOfAvailableNodes = - clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet()); + int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( + scope, excludedNodes.keySet()); StringBuilder builder = null; if (LOG.isDebugEnabled()) { - builder = threadLocalBuilder.get(); + builder = debugLoggingBuilder.get(); builder.setLength(0); builder.append("["); } boolean badTarget = false; + DatanodeDescriptor firstChosen = null; while(numOfReplicas > 0 && numOfAvailableNodes > 0) { DatanodeDescriptor chosenNode = - (DatanodeDescriptor)(clusterMap.chooseRandom(nodes)); + (DatanodeDescriptor)clusterMap.chooseRandom(scope); Node oldNode = excludedNodes.put(chosenNode, chosenNode); if (oldNode == null) { numOfAvailableNodes--; - if (isGoodTarget(chosenNode, blocksize, - maxNodesPerRack, results, avoidStaleNodes)) { + int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes, + blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes); + if (newExcludedNodes >= 0) { numOfReplicas--; - results.add(chosenNode); - // add chosenNode and related nodes to excludedNode - int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes); + if (firstChosen == null) { + firstChosen = chosenNode; + } numOfAvailableNodes -= newExcludedNodes; - adjustExcludedNodes(excludedNodes, chosenNode); } else { badTarget = true; } @@ -544,34 +500,44 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } throw new NotEnoughReplicasException(detail); } - } - - /** - * After choosing a node to place replica, adjust excluded nodes accordingly. - * It should do nothing here as chosenNode is already put into exlcudeNodes, - * but it can be overridden in subclass to put more related nodes into - * excludedNodes. - * - * @param excludedNodes - * @param chosenNode - */ - protected void adjustExcludedNodes(HashMap excludedNodes, - Node chosenNode) { - // do nothing here. + + return firstChosen; } - /* judge if a node is a good target. - * return true if node has enough space, - * does not have too much load, and the rack does not have too many nodes + /** + * If the given node is a good target, add it to the result list and + * update the excluded node map. + * @return -1 if the given is not a good target; + * otherwise, return the number of excluded nodes added to the map. */ - private boolean isGoodTarget(DatanodeDescriptor node, - long blockSize, int maxTargetPerRack, - List results, - boolean avoidStaleNodes) { - return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad, - results, avoidStaleNodes); + int addIfIsGoodTarget(DatanodeDescriptor node, + Map excludedNodes, + long blockSize, + int maxNodesPerRack, + boolean considerLoad, + List results, + boolean avoidStaleNodes) { + if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad, + results, avoidStaleNodes)) { + results.add(node); + // add node and related nodes to excludedNode + return addToExcludedNodes(node, excludedNodes); + } else { + return -1; + } } - + + private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) { + if (LOG.isDebugEnabled()) { + // build the error message for later use. + debugLoggingBuilder.get() + .append(node).append(": ") + .append("Node ").append(NodeBase.getPath(node)) + .append(" is not chosen because ") + .append(reason); + } + } + /** * Determine if a node is a good target. * @@ -588,28 +554,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * does not have too much load, * and the rack does not have too many nodes. */ - protected boolean isGoodTarget(DatanodeDescriptor node, + private boolean isGoodTarget(DatanodeDescriptor node, long blockSize, int maxTargetPerRack, boolean considerLoad, List results, boolean avoidStaleNodes) { // check if the node is (being) decommissed if (node.isDecommissionInProgress() || node.isDecommissioned()) { - if(LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node is (being) decommissioned "); - } + logNodeIsNotChosen(node, "the node is (being) decommissioned "); return false; } if (avoidStaleNodes) { if (node.isStale(this.staleInterval)) { - if (LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node is stale "); - } + logNodeIsNotChosen(node, "the node is stale "); return false; } } @@ -618,11 +576,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { (node.getBlocksScheduled() * blockSize); // check the remaining capacity of the target machine if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) { - if(LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node does not have enough space "); - } + logNodeIsNotChosen(node, "the node does not have enough space "); return false; } @@ -634,11 +588,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { avgLoad = (double)stats.getTotalLoad()/size; } if (node.getXceiverCount() > (2.0 * avgLoad)) { - if(LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node is too busy "); - } + logNodeIsNotChosen(node, "the node is too busy "); return false; } } @@ -646,31 +596,25 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // check if the target rack has chosen too many nodes String rackname = node.getNetworkLocation(); int counter=1; - for(Iterator iter = results.iterator(); - iter.hasNext();) { - Node result = iter.next(); + for(Node result : results) { if (rackname.equals(result.getNetworkLocation())) { counter++; } } if (counter>maxTargetPerRack) { - if(LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the rack has too many chosen nodes "); - } + logNodeIsNotChosen(node, "the rack has too many chosen nodes "); return false; } return true; } - /* Return a pipeline of nodes. + /** + * Return a pipeline of nodes. * The pipeline is formed finding a shortest path that * starts from the writer and traverses all nodes * This is basically a traveling salesman problem. */ - private DatanodeDescriptor[] getPipeline( - DatanodeDescriptor writer, + private DatanodeDescriptor[] getPipeline(DatanodeDescriptor writer, DatanodeDescriptor[] nodes) { if (nodes.length==0) return nodes; @@ -709,7 +653,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int minRacks) { DatanodeInfo[] locs = lBlk.getLocations(); if (locs == null) - locs = new DatanodeInfo[0]; + locs = DatanodeDescriptor.EMPTY_ARRAY; int numRacks = clusterMap.getNumOfRacks(); if(numRacks <= 1) // only one rack return 0; @@ -724,24 +668,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { @Override public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc, - Block block, - short replicationFactor, - Collection first, - Collection second) { + Block block, short replicationFactor, + Collection first, + Collection second) { long oldestHeartbeat = now() - heartbeatInterval * tolerateHeartbeatMultiplier; DatanodeDescriptor oldestHeartbeatNode = null; long minSpace = Long.MAX_VALUE; DatanodeDescriptor minSpaceNode = null; - // pick replica from the first Set. If first is empty, then pick replicas - // from second set. - Iterator iter = pickupReplicaSet(first, second); - // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval - while (iter.hasNext() ) { - DatanodeDescriptor node = iter.next(); + for(DatanodeDescriptor node : pickupReplicaSet(first, second)) { long free = node.getRemaining(); long lastHeartbeat = node.getLastUpdate(); if(lastHeartbeat < oldestHeartbeat) { @@ -762,12 +700,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * replica while second set contains remaining replica nodes. * So pick up first set if not empty. If first is empty, then pick second. */ - protected Iterator pickupReplicaSet( + protected Collection pickupReplicaSet( Collection first, Collection second) { - Iterator iter = - first.isEmpty() ? second.iterator() : first.iterator(); - return iter; + return first.isEmpty() ? second : first; } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index e98318b9783..f44f28da825 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -65,7 +64,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau */ @Override protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, - HashMap excludedNodes, long blocksize, int maxNodesPerRack, + Map excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { // if no local machine, randomly choose one node @@ -76,12 +75,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau // otherwise try local machine first Node oldNode = excludedNodes.put(localMachine, localMachine); if (oldNode == null) { // was not in the excluded list - if (isGoodTarget(localMachine, blocksize, - maxNodesPerRack, false, results, avoidStaleNodes)) { - results.add(localMachine); - // Nodes under same nodegroup should be excluded. - addNodeGroupToExcludedNodes(excludedNodes, - localMachine.getNetworkLocation()); + if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize, + maxNodesPerRack, false, results, avoidStaleNodes) >= 0) { return localMachine; } } @@ -98,26 +93,10 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau blocksize, maxNodesPerRack, results, avoidStaleNodes); } - @Override - protected void adjustExcludedNodes(HashMap excludedNodes, - Node chosenNode) { - // as node-group aware implementation, it should make sure no two replica - // are placing on the same node group. - addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation()); - } - // add all nodes under specific nodegroup to excludedNodes. - private void addNodeGroupToExcludedNodes(HashMap excludedNodes, - String nodeGroup) { - List leafNodes = clusterMap.getLeaves(nodeGroup); - for (Node node : leafNodes) { - excludedNodes.put(node, node); - } - } - @Override protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, - HashMap excludedNodes, long blocksize, int maxNodesPerRack, + Map excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { // no local machine, so choose a random machine @@ -137,9 +116,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; - for(Iterator iter=results.iterator(); - iter.hasNext();) { - DatanodeDescriptor nextNode = iter.next(); + for(DatanodeDescriptor nextNode : results) { if (nextNode != localMachine) { newLocal = nextNode; break; @@ -165,7 +142,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau @Override protected void chooseRemoteRack(int numOfReplicas, - DatanodeDescriptor localMachine, HashMap excludedNodes, + DatanodeDescriptor localMachine, Map excludedNodes, long blocksize, int maxReplicasPerRack, List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); @@ -192,7 +169,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau */ private DatanodeDescriptor chooseLocalNodeGroup( NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine, - HashMap excludedNodes, long blocksize, int maxNodesPerRack, + Map excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { // no local machine, so choose a random machine @@ -209,9 +186,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; - for(Iterator iter=results.iterator(); - iter.hasNext();) { - DatanodeDescriptor nextNode = iter.next(); + for(DatanodeDescriptor nextNode : results) { if (nextNode != localMachine) { newLocal = nextNode; break; @@ -248,10 +223,11 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau * within the same nodegroup * @return number of new excluded nodes */ - protected int addToExcludedNodes(DatanodeDescriptor localMachine, - HashMap excludedNodes) { + @Override + protected int addToExcludedNodes(DatanodeDescriptor chosenNode, + Map excludedNodes) { int countOfExcludedNodes = 0; - String nodeGroupScope = localMachine.getNetworkLocation(); + String nodeGroupScope = chosenNode.getNetworkLocation(); List leafNodes = clusterMap.getLeaves(nodeGroupScope); for (Node leafNode : leafNodes) { Node node = excludedNodes.put(leafNode, leafNode); @@ -274,12 +250,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau * If first is empty, then pick second. */ @Override - public Iterator pickupReplicaSet( + public Collection pickupReplicaSet( Collection first, Collection second) { // If no replica within same rack, return directly. if (first.isEmpty()) { - return second.iterator(); + return second; } // Split data nodes in the first set into two sets, // moreThanOne contains nodes on nodegroup with more than one replica @@ -312,9 +288,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau } } - Iterator iter = - moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator(); - return iter; + return moreThanOne.isEmpty()? exactlyOne : moreThanOne; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index a4ddc3130e4..b006001f59a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -43,7 +43,8 @@ import org.apache.hadoop.util.Time; @InterfaceAudience.Private @InterfaceStability.Evolving public class DatanodeDescriptor extends DatanodeInfo { - + public static final DatanodeDescriptor[] EMPTY_ARRAY = {}; + // Stores status of decommissioning. // If node is not decommissioning, do not use this object for anything. public DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java index 462d0b4979e..ff9ea0728c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -39,9 +40,11 @@ import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; +import org.junit.Assert; import org.junit.Test; -import junit.framework.Assert; /** * This class tests if a balancer schedules tasks correctly. @@ -75,10 +78,9 @@ public class TestBalancerWithNodeGroup { Configuration conf = new HdfsConfiguration(); TestBalancer.initConf(conf); conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, - "org.apache.hadoop.net.NetworkTopologyWithNodeGroup"); - conf.set("dfs.block.replicator.classname", - "org.apache.hadoop.hdfs.server.blockmanagement." + - "BlockPlacementPolicyWithNodeGroup"); + NetworkTopologyWithNodeGroup.class.getName()); + conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyWithNodeGroup.class.getName()); return conf; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java index 834842c70e6..2e5d70b0965 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java @@ -157,8 +157,8 @@ public class TestRBWBlockInvalidation { // in the context of the test, whereas a random one is more accurate // to what is seen in real clusters (nodes have random amounts of free // space) - conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class, - BlockPlacementPolicy.class); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + RandomDeleterPolicy.class, BlockPlacementPolicy.class); // Speed up the test a bit with faster heartbeats. conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 9d80076813d..2821c126450 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -138,30 +138,25 @@ public class TestReplicationPolicy { HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1); assertEquals(targets.length, 1); assertEquals(targets[0], dataNodes[0]); - targets = replicator.chooseTarget(filename, - 2, dataNodes[0], new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2); assertEquals(targets.length, 2); assertEquals(targets[0], dataNodes[0]); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3); assertEquals(targets.length, 3); assertEquals(targets[0], dataNodes[0]); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); assertTrue(cluster.isOnSameRack(targets[1], targets[2])); - targets = replicator.chooseTarget(filename, 4, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(4); assertEquals(targets.length, 4); assertEquals(targets[0], dataNodes[0]); assertTrue(cluster.isOnSameRack(targets[1], targets[2]) || @@ -173,15 +168,38 @@ public class TestReplicationPolicy { HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); } + private static DatanodeDescriptor[] chooseTarget(int numOfReplicas) { + return chooseTarget(numOfReplicas, dataNodes[0]); + } + + private static DatanodeDescriptor[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer) { + return chooseTarget(numOfReplicas, writer, + new ArrayList()); + } + + private static DatanodeDescriptor[] chooseTarget(int numOfReplicas, + List chosenNodes) { + return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); + } + + private static DatanodeDescriptor[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, List chosenNodes) { + return chooseTarget(numOfReplicas, writer, chosenNodes, null); + } + + private static DatanodeDescriptor[] chooseTarget(int numOfReplicas, + List chosenNodes, Map excludedNodes) { + return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes); + } + private static DatanodeDescriptor[] chooseTarget( - BlockPlacementPolicyDefault policy, int numOfReplicas, DatanodeDescriptor writer, List chosenNodes, - HashMap excludedNodes, - long blocksize) { - return policy.chooseTarget(numOfReplicas, writer, chosenNodes, false, - excludedNodes, blocksize); + Map excludedNodes) { + return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, + false, excludedNodes, BLOCK_SIZE); } /** @@ -196,28 +214,24 @@ public class TestReplicationPolicy { public void testChooseTarget2() throws Exception { HashMap excludedNodes; DatanodeDescriptor[] targets; - BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator; List chosenNodes = new ArrayList(); excludedNodes = new HashMap(); excludedNodes.put(dataNodes[1], dataNodes[1]); - targets = chooseTarget(repl, 0, dataNodes[0], chosenNodes, excludedNodes, - BLOCK_SIZE); + targets = chooseTarget(0, chosenNodes, excludedNodes); assertEquals(targets.length, 0); excludedNodes.clear(); chosenNodes.clear(); excludedNodes.put(dataNodes[1], dataNodes[1]); - targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes, - BLOCK_SIZE); + targets = chooseTarget(1, chosenNodes, excludedNodes); assertEquals(targets.length, 1); assertEquals(targets[0], dataNodes[0]); excludedNodes.clear(); chosenNodes.clear(); excludedNodes.put(dataNodes[1], dataNodes[1]); - targets = chooseTarget(repl, 2, dataNodes[0], chosenNodes, excludedNodes, - BLOCK_SIZE); + targets = chooseTarget(2, chosenNodes, excludedNodes); assertEquals(targets.length, 2); assertEquals(targets[0], dataNodes[0]); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); @@ -225,8 +239,7 @@ public class TestReplicationPolicy { excludedNodes.clear(); chosenNodes.clear(); excludedNodes.put(dataNodes[1], dataNodes[1]); - targets = chooseTarget(repl, 3, dataNodes[0], chosenNodes, excludedNodes, - BLOCK_SIZE); + targets = chooseTarget(3, chosenNodes, excludedNodes); assertEquals(targets.length, 3); assertEquals(targets[0], dataNodes[0]); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); @@ -235,8 +248,7 @@ public class TestReplicationPolicy { excludedNodes.clear(); chosenNodes.clear(); excludedNodes.put(dataNodes[1], dataNodes[1]); - targets = chooseTarget(repl, 4, dataNodes[0], chosenNodes, excludedNodes, - BLOCK_SIZE); + targets = chooseTarget(4, chosenNodes, excludedNodes); assertEquals(targets.length, 4); assertEquals(targets[0], dataNodes[0]); for(int i=1; i<4; i++) { @@ -250,7 +262,7 @@ public class TestReplicationPolicy { chosenNodes.clear(); excludedNodes.put(dataNodes[1], dataNodes[1]); chosenNodes.add(dataNodes[2]); - targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true, + targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, excludedNodes, BLOCK_SIZE); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); @@ -276,30 +288,25 @@ public class TestReplicationPolicy { (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1); assertEquals(targets.length, 1); assertEquals(targets[0], dataNodes[1]); - targets = replicator.chooseTarget(filename, 2, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2); assertEquals(targets.length, 2); assertEquals(targets[0], dataNodes[1]); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3); assertEquals(targets.length, 3); assertEquals(targets[0], dataNodes[1]); assertTrue(cluster.isOnSameRack(targets[1], targets[2])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 4, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(4); assertEquals(targets.length, 4); assertEquals(targets[0], dataNodes[1]); for(int i=1; i<4; i++) { @@ -332,23 +339,19 @@ public class TestReplicationPolicy { } DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1); assertEquals(targets.length, 1); assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0])); - targets = replicator.chooseTarget(filename, 2, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2); assertEquals(targets.length, 2); assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3); assertEquals(targets.length, 3); for(int i=0; i<3; i++) { assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0])); @@ -377,21 +380,17 @@ public class TestReplicationPolicy { DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4"); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, writerDesc, - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0, writerDesc); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, writerDesc, - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1, writerDesc); assertEquals(targets.length, 1); - targets = replicator.chooseTarget(filename, 2, writerDesc, - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2, writerDesc); assertEquals(targets.length, 2); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, writerDesc, - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3, writerDesc); assertEquals(targets.length, 3); assertTrue(cluster.isOnSameRack(targets[1], targets[2])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); @@ -435,9 +434,7 @@ public class TestReplicationPolicy { // try to choose NUM_OF_DATANODES which is more than actually available // nodes. - DatanodeDescriptor[] targets = replicator.chooseTarget(filename, - NUM_OF_DATANODES, dataNodes[0], new ArrayList(), - BLOCK_SIZE); + DatanodeDescriptor[] targets = chooseTarget(NUM_OF_DATANODES); assertEquals(targets.length, NUM_OF_DATANODES - 2); final List log = appender.getLog(); @@ -480,17 +477,14 @@ public class TestReplicationPolicy { DatanodeDescriptor[] targets; // We set the datanode[0] as stale, thus should choose datanode[1] since // datanode[1] is on the same rack with datanode[0] (writer) - targets = replicator.chooseTarget(filename, 1, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1); assertEquals(targets.length, 1); assertEquals(targets[0], dataNodes[1]); HashMap excludedNodes = new HashMap(); excludedNodes.put(dataNodes[1], dataNodes[1]); List chosenNodes = new ArrayList(); - BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator; - targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes, - BLOCK_SIZE); + targets = chooseTarget(1, chosenNodes, excludedNodes); assertEquals(targets.length, 1); assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0])); @@ -517,33 +511,27 @@ public class TestReplicationPolicy { namenode.getNamesystem().getBlockManager() .getDatanodeManager().getHeartbeatManager().heartbeatCheck(); - DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + DatanodeDescriptor[] targets = chooseTarget(0); assertEquals(targets.length, 0); // Since we have 6 datanodes total, stale nodes should // not be returned until we ask for more than 3 targets - targets = replicator.chooseTarget(filename, 1, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1); assertEquals(targets.length, 1); assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2)); - targets = replicator.chooseTarget(filename, 2, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2); assertEquals(targets.length, 2); assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2)); assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2)); - targets = replicator.chooseTarget(filename, 3, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3); assertEquals(targets.length, 3); assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5)); assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5)); assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5)); - targets = replicator.chooseTarget(filename, 4, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(4); assertEquals(targets.length, 4); assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3)); assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3)); @@ -596,7 +584,8 @@ public class TestReplicationPolicy { BlockPlacementPolicy replicator = miniCluster.getNameNode() .getNamesystem().getBlockManager().getBlockPlacementPolicy(); DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3, - staleNodeInfo, new ArrayList(), BLOCK_SIZE); + staleNodeInfo, new ArrayList(), false, null, BLOCK_SIZE); + assertEquals(targets.length, 3); assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo)); @@ -620,7 +609,7 @@ public class TestReplicationPolicy { .getDatanodeManager().shouldAvoidStaleDataNodesForWrite()); // Call chooseTarget targets = replicator.chooseTarget(filename, 3, - staleNodeInfo, new ArrayList(), BLOCK_SIZE); + staleNodeInfo, new ArrayList(), false, null, BLOCK_SIZE); assertEquals(targets.length, 3); assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo)); @@ -642,8 +631,7 @@ public class TestReplicationPolicy { assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager() .getDatanodeManager().shouldAvoidStaleDataNodesForWrite()); // Call chooseTarget - targets = replicator.chooseTarget(filename, 3, - staleNodeInfo, new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3, staleNodeInfo); assertEquals(targets.length, 3); assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo)); } finally { @@ -664,23 +652,19 @@ public class TestReplicationPolicy { chosenNodes.add(dataNodes[0]); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, - 0, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(0, chosenNodes); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, - 1, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, chosenNodes); assertEquals(targets.length, 1); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); - targets = replicator.chooseTarget(filename, - 2, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(2, chosenNodes); assertEquals(targets.length, 2); assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, - 3, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(3, chosenNodes); assertEquals(targets.length, 3); assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); assertFalse(cluster.isOnSameRack(targets[0], targets[2])); @@ -700,17 +684,14 @@ public class TestReplicationPolicy { chosenNodes.add(dataNodes[1]); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, - 0, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(0, chosenNodes); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, - 1, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, chosenNodes); assertEquals(targets.length, 1); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); - targets = replicator.chooseTarget(filename, - 2, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(2, chosenNodes); assertEquals(targets.length, 2); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1])); @@ -730,29 +711,24 @@ public class TestReplicationPolicy { chosenNodes.add(dataNodes[2]); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, - 0, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(0, chosenNodes); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, - 1, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, chosenNodes); assertEquals(targets.length, 1); assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0])); - targets = replicator.chooseTarget(filename, - 1, dataNodes[2], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, dataNodes[2], chosenNodes); assertEquals(targets.length, 1); assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0])); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); - targets = replicator.chooseTarget(filename, - 2, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(2, chosenNodes); assertEquals(targets.length, 2); assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); - targets = replicator.chooseTarget(filename, - 2, dataNodes[2], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(2, dataNodes[2], chosenNodes); assertEquals(targets.length, 2); assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0])); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index c453f198e1d..ffea18524ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; import org.apache.hadoop.net.Node; import org.apache.hadoop.test.PathUtils; import org.junit.After; @@ -101,10 +102,10 @@ public class TestReplicationPolicyWithNodeGroup { FileSystem.setDefaultUri(CONF, "hdfs://localhost:0"); CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); // Set properties to make HDFS aware of NodeGroup. - CONF.set("dfs.block.replicator.classname", - "org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup"); + CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyWithNodeGroup.class.getName()); CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, - "org.apache.hadoop.net.NetworkTopologyWithNodeGroup"); + NetworkTopologyWithNodeGroup.class.getName()); File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class); @@ -156,6 +157,35 @@ public class TestReplicationPolicyWithNodeGroup { return true; } + private DatanodeDescriptor[] chooseTarget(int numOfReplicas) { + return chooseTarget(numOfReplicas, dataNodes[0]); + } + + private DatanodeDescriptor[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer) { + return chooseTarget(numOfReplicas, writer, + new ArrayList()); + } + + private DatanodeDescriptor[] chooseTarget(int numOfReplicas, + List chosenNodes) { + return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); + } + + private DatanodeDescriptor[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, List chosenNodes) { + return chooseTarget(numOfReplicas, writer, chosenNodes, null); + } + + private DatanodeDescriptor[] chooseTarget( + int numOfReplicas, + DatanodeDescriptor writer, + List chosenNodes, + Map excludedNodes) { + return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, + false, excludedNodes, BLOCK_SIZE); + } + /** * In this testcase, client is dataNodes[0]. So the 1st replica should be * placed on dataNodes[0], the 2nd replica should be placed on @@ -172,31 +202,26 @@ public class TestReplicationPolicyWithNodeGroup { HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1); assertEquals(targets.length, 1); assertEquals(targets[0], dataNodes[0]); - targets = replicator.chooseTarget(filename, 2, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2); assertEquals(targets.length, 2); assertEquals(targets[0], dataNodes[0]); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3); assertEquals(targets.length, 3); assertEquals(targets[0], dataNodes[0]); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); assertTrue(cluster.isOnSameRack(targets[1], targets[2])); assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2])); - targets = replicator.chooseTarget(filename, 4, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(4); assertEquals(targets.length, 4); assertEquals(targets[0], dataNodes[0]); assertTrue(cluster.isOnSameRack(targets[1], targets[2]) || @@ -235,7 +260,7 @@ public class TestReplicationPolicyWithNodeGroup { excludedNodes = new HashMap(); excludedNodes.put(dataNodes[1], dataNodes[1]); - targets = repl.chooseTarget(4, dataNodes[0], chosenNodes, false, + targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, excludedNodes, BLOCK_SIZE); assertEquals(targets.length, 4); assertEquals(targets[0], dataNodes[0]); @@ -252,7 +277,7 @@ public class TestReplicationPolicyWithNodeGroup { chosenNodes.clear(); excludedNodes.put(dataNodes[1], dataNodes[1]); chosenNodes.add(dataNodes[2]); - targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true, + targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, excludedNodes, BLOCK_SIZE); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); @@ -278,30 +303,25 @@ public class TestReplicationPolicyWithNodeGroup { (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1); assertEquals(targets.length, 1); assertEquals(targets[0], dataNodes[1]); - targets = replicator.chooseTarget(filename, 2, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2); assertEquals(targets.length, 2); assertEquals(targets[0], dataNodes[1]); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3); assertEquals(targets.length, 3); assertEquals(targets[0], dataNodes[1]); assertTrue(cluster.isOnSameRack(targets[1], targets[2])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 4, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(4); assertEquals(targets.length, 4); assertEquals(targets[0], dataNodes[1]); assertTrue(cluster.isNodeGroupAware()); @@ -333,23 +353,19 @@ public class TestReplicationPolicyWithNodeGroup { } DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1); assertEquals(targets.length, 1); assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0])); - targets = replicator.chooseTarget(filename, 2, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2); assertEquals(targets.length, 2); assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, dataNodes[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3); assertEquals(targets.length, 3); for(int i=0; i<3; i++) { assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0])); @@ -371,21 +387,17 @@ public class TestReplicationPolicyWithNodeGroup { public void testChooseTarget5() throws Exception { setupDataNodeCapacity(); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, NODE, - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0, NODE); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, NODE, - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1, NODE); assertEquals(targets.length, 1); - targets = replicator.chooseTarget(filename, 2, NODE, - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2, NODE); assertEquals(targets.length, 2); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, NODE, - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3, NODE); assertEquals(targets.length, 3); assertTrue(cluster.isOnSameRack(targets[1], targets[2])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); @@ -406,23 +418,19 @@ public class TestReplicationPolicyWithNodeGroup { chosenNodes.add(dataNodes[0]); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, - 0, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(0, chosenNodes); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, - 1, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, chosenNodes); assertEquals(targets.length, 1); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); - targets = replicator.chooseTarget(filename, - 2, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(2, chosenNodes); assertEquals(targets.length, 2); assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, - 3, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(3, chosenNodes); assertEquals(targets.length, 3); assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0])); @@ -444,17 +452,14 @@ public class TestReplicationPolicyWithNodeGroup { chosenNodes.add(dataNodes[1]); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, - 0, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(0, chosenNodes); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, - 1, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, chosenNodes); assertEquals(targets.length, 1); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); - targets = replicator.chooseTarget(filename, - 2, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(2, chosenNodes); assertEquals(targets.length, 2); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) && cluster.isOnSameRack(dataNodes[0], targets[1])); @@ -475,30 +480,26 @@ public class TestReplicationPolicyWithNodeGroup { chosenNodes.add(dataNodes[3]); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, - 0, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(0, chosenNodes); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, - 1, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, chosenNodes); assertEquals(targets.length, 1); assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0])); - targets = replicator.chooseTarget(filename, - 1, dataNodes[3], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, dataNodes[3], chosenNodes); assertEquals(targets.length, 1); assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0])); assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0])); assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0])); - targets = replicator.chooseTarget(filename, - 2, dataNodes[0], chosenNodes, BLOCK_SIZE); + targets = chooseTarget(2, chosenNodes); assertEquals(targets.length, 2); assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0])); assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0])); - targets = replicator.chooseTarget(filename, - 2, dataNodes[3], chosenNodes, BLOCK_SIZE); + + targets = chooseTarget(2, dataNodes[3], chosenNodes); assertEquals(targets.length, 2); assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0])); } @@ -584,21 +585,17 @@ public class TestReplicationPolicyWithNodeGroup { } DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(0, dataNodesInBoundaryCase[0]); assertEquals(targets.length, 0); - targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(1, dataNodesInBoundaryCase[0]); assertEquals(targets.length, 1); - targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(2, dataNodesInBoundaryCase[0]); assertEquals(targets.length, 2); assertFalse(cluster.isOnSameRack(targets[0], targets[1])); - targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3, dataNodesInBoundaryCase[0]); assertEquals(targets.length, 3); assertTrue(checkTargetsOnDifferentNodeGroup(targets)); } @@ -621,8 +618,7 @@ public class TestReplicationPolicyWithNodeGroup { chosenNodes.add(dataNodesInBoundaryCase[0]); chosenNodes.add(dataNodesInBoundaryCase[5]); DatanodeDescriptor[] targets; - targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0], - chosenNodes, BLOCK_SIZE); + targets = chooseTarget(1, dataNodesInBoundaryCase[0], chosenNodes); assertFalse(cluster.isOnSameNodeGroup(targets[0], dataNodesInBoundaryCase[0])); assertFalse(cluster.isOnSameNodeGroup(targets[0], @@ -661,14 +657,12 @@ public class TestReplicationPolicyWithNodeGroup { DatanodeDescriptor[] targets; // Test normal case -- 3 replicas - targets = replicator.chooseTarget(filename, 3, dataNodesInMoreTargetsCase[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(3, dataNodesInMoreTargetsCase[0]); assertEquals(targets.length, 3); assertTrue(checkTargetsOnDifferentNodeGroup(targets)); // Test special case -- replica number over node groups. - targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0], - new ArrayList(), BLOCK_SIZE); + targets = chooseTarget(10, dataNodesInMoreTargetsCase[0]); assertTrue(checkTargetsOnDifferentNodeGroup(targets)); // Verify it only can find 6 targets for placing replicas. assertEquals(targets.length, 6); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 96a890e2228..1b0cb71b4df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -96,8 +96,8 @@ public class TestDNFencing { // Increase max streams so that we re-replicate quickly. conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000); // See RandomDeleterPolicy javadoc. - conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class, - BlockPlacementPolicy.class); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + RandomDeleterPolicy.class, BlockPlacementPolicy.class); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index ad1c9f1b506..7ce4d5d1b6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -45,6 +45,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -57,6 +58,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import com.sun.research.ws.wadl.Response; + /** * Registers/unregisters to RM and sends heartbeats to RM. */ @@ -194,7 +197,15 @@ public abstract class RMCommunicator extends AbstractService FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(finishState, sb.toString(), historyUrl); - scheduler.finishApplicationMaster(request); + while (true) { + FinishApplicationMasterResponse response = + scheduler.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + break; + } + LOG.info("Waiting for application to be successfully unregistered."); + Thread.sleep(rmPollInterval); + } } catch(Exception are) { LOG.error("Exception while unregistering ", are); } @@ -237,15 +248,6 @@ public abstract class RMCommunicator extends AbstractService } catch (YarnRuntimeException e) { LOG.error("Error communicating with RM: " + e.getMessage() , e); return; - } catch (InvalidToken e) { - // This can happen if the RM has been restarted, since currently - // when RM restarts AMRMToken is not populated back to - // AMRMTokenSecretManager yet. Once this is fixed, no need - // to send JOB_AM_REBOOT event in this method any more. - eventHandler.handle(new JobEvent(job.getID(), - JobEventType.JOB_AM_REBOOT)); - LOG.error("Error in authencating with RM: " ,e); - return; } catch (Exception e) { LOG.error("ERROR IN CONTACTING RM. ", e); continue; diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index bc7e802a2db..316be600309 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -59,6 +59,9 @@ ${basedir}/../../hadoop-common-project/hadoop-common/target file:///dev/urandom + + 1.9 + 2.5.0 @@ -365,12 +368,12 @@ com.sun.jersey jersey-core - 1.8 + ${jersey.version} com.sun.jersey jersey-json - 1.8 + ${jersey.version} javax.xml.stream @@ -381,7 +384,7 @@ com.sun.jersey jersey-server - 1.8 + ${jersey.version} @@ -399,19 +402,19 @@ com.sun.jersey.contribs jersey-guice - 1.8 + ${jersey.version} com.sun.jersey.jersey-test-framework jersey-test-framework-core - 1.8 + ${jersey.version} test com.sun.jersey.jersey-test-framework jersey-test-framework-grizzly2 - 1.8 + ${jersey.version} diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index df573f52cfc..a02d7ef1c42 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -194,6 +194,15 @@ Release 2.1.1-beta - UNRELEASED YARN-1194. TestContainerLogsPage fails with native builds (Roman Shaposhnik via jlowe) + YARN-1116. Populate AMRMTokens back to AMRMTokenSecretManager after RM + restarts (Jian He via bikas) + + YARN-1189. NMTokenSecretManagerInNM is not being told when applications + have finished (Omkar Vinit Joshi via jlowe) + + YARN-540. Race condition causing RM to potentially relaunch already + unregistered AMs on RM restart (Jian He via bikas) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java index 4317b675862..8de2c73d88a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java @@ -26,21 +26,52 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.util.Records; /** - *

The response sent by the ResourceManager to a - * ApplicationMaster on it's completion.

+ *

+ * The response sent by the ResourceManager to a + * ApplicationMaster on it's completion. + *

* - *

Currently, this is empty.

+ *

+ * The response, includes: + *

    + *
  • A flag which indicates that the application has successfully unregistered + * with the RM and the application can safely stop.
  • + *
+ *

+ * Note: The flag indicates whether the application has successfully + * unregistered and is safe to stop. The application may stop after the flag is + * true. If the application stops before the flag is true then the RM may retry + * the application . * * @see ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest) */ @Public @Stable public abstract class FinishApplicationMasterResponse { + @Private @Unstable - public static FinishApplicationMasterResponse newInstance() { + public static FinishApplicationMasterResponse newInstance( + boolean isRemovedFromRMStateStore) { FinishApplicationMasterResponse response = Records.newRecord(FinishApplicationMasterResponse.class); + response.setIsUnregistered(isRemovedFromRMStateStore); return response; } + + /** + * Get the flag which indicates that the application has successfully + * unregistered with the RM and the application can safely stop. + */ + @Public + @Stable + public abstract boolean getIsUnregistered(); + + /** + * Set the flag which indicates that the application has successfully + * unregistered with the RM and the application can safely stop. + */ + @Private + @Unstable + public abstract void setIsUnregistered(boolean isUnregistered); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 7b3d0cf77cd..36e1d458efd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -52,6 +52,7 @@ message FinishApplicationMasterRequestProto { } message FinishApplicationMasterResponseProto { + optional bool isUnregistered = 1 [default = false]; } message AllocateRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index c433b55b6ec..beee423a47c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -300,11 +301,24 @@ public class AMRMClientImpl extends AMRMClient { String appMessage, String appTrackingUrl) throws YarnException, IOException { Preconditions.checkArgument(appStatus != null, - "AppStatus should not be null."); + "AppStatus should not be null."); FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(appStatus, appMessage, appTrackingUrl); - rmClient.finishApplicationMaster(request); + try { + while (true) { + FinishApplicationMasterResponse response = + rmClient.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + break; + } + LOG.info("Waiting for application to be successfully unregistered."); + Thread.sleep(100); + } + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for application" + + " to be removed from RMStateStore"); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java index ff57eb42d77..1bad3744dcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java @@ -22,7 +22,9 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -67,4 +69,24 @@ public class FinishApplicationMasterResponsePBImpl extends FinishApplicationMast public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = FinishApplicationMasterResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public boolean getIsUnregistered() { + FinishApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + return p.getIsUnregistered(); + } + + @Override + public void setIsUnregistered(boolean isUnregistered) { + maybeInitBuilder(); + builder.setIsUnregistered(isUnregistered); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index edf6359a7b5..43a2fcc0055 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -395,6 +395,7 @@ public class ApplicationImpl implements Application { app.dispatcher.getEventHandler().handle( new LogHandlerAppFinishedEvent(app.appId)); + app.context.getNMTokenSecretManager().appFinished(app.getAppId()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 429ad454897..3b2878c3709 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.refEq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Test; @@ -413,6 +415,27 @@ public class TestApplication { } } + @Test + public void testNMTokenSecretManagerCleanup() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(1, 314159265358979L, "yak", 1); + wa.initApplication(); + wa.initContainer(0); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + assertEquals(1, wa.app.getContainers().size()); + wa.appFinished(); + wa.containerFinished(0); + wa.appResourcesCleanedup(); + assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); + verify(wa.nmTokenSecretMgr).appFinished(eq(wa.appId)); + } finally { + if (wa != null) { + wa.finished(); + } + } + } + private class ContainerKillMatcher extends ArgumentMatcher { private ContainerId cId; @@ -460,6 +483,7 @@ public class TestApplication { final List containers; final Context context; final Map containerTokenIdentifierMap; + final NMTokenSecretManagerInNM nmTokenSecretMgr; final ApplicationId appId; final Application app; @@ -486,12 +510,15 @@ public class TestApplication { dispatcher.register(ContainerEventType.class, containerBus); dispatcher.register(LogHandlerEventType.class, logAggregationBus); + nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class); + context = mock(Context.class); when(context.getContainerTokenSecretManager()).thenReturn( new NMContainerTokenSecretManager(conf)); when(context.getApplicationACLsManager()).thenReturn( new ApplicationACLsManager(conf)); + when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr); // Setting master key MasterKey masterKey = new MasterKeyPBImpl(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index a41792db1fd..fd39dad43a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -303,9 +304,12 @@ public class ApplicationMasterService extends AbstractService implements .getTrackingUrl(), request.getFinalApplicationStatus(), request .getDiagnostics())); - FinishApplicationMasterResponse response = recordFactory - .newRecordInstance(FinishApplicationMasterResponse.class); - return response; + if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) + .isAppSafeToUnregister()) { + return FinishApplicationMasterResponse.newInstance(true); + } else { + return FinishApplicationMasterResponse.newInstance(false); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 11248bad48c..cc31643296f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -414,8 +414,8 @@ public class ClientRMService extends AbstractService implements } if (applicationStates != null && !applicationStates.isEmpty()) { - if (!applicationStates.contains(RMServerUtils - .createApplicationState(application.getState()))) { + if (!applicationStates.contains(application + .createApplicationState())) { continue; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 6439df1c225..0c38f506b5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -186,10 +186,6 @@ public class RMAppManager implements EventHandler, completedApps.add(applicationId); writeAuditLog(applicationId); - - // application completely done. Remove from state - RMStateStore store = rmContext.getStateStore(); - store.removeApplication(rmContext.getRMApps().get(applicationId)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 15d306293e9..370040ae740 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -115,27 +115,4 @@ public class RMServerUtils { } } } - - public static YarnApplicationState createApplicationState(RMAppState rmAppState) { - switch(rmAppState) { - case NEW: - return YarnApplicationState.NEW; - case NEW_SAVING: - return YarnApplicationState.NEW_SAVING; - case SUBMITTED: - return YarnApplicationState.SUBMITTED; - case ACCEPTED: - return YarnApplicationState.ACCEPTED; - case RUNNING: - return YarnApplicationState.RUNNING; - case FINISHING: - case FINISHED: - return YarnApplicationState.FINISHED; - case KILLED: - return YarnApplicationState.KILLED; - case FAILED: - return YarnApplicationState.FAILED; - } - throw new YarnRuntimeException("Unknown state passed!"); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index bdf4da38f0b..0852ce81a80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -108,7 +109,9 @@ public class MemoryRMStateStore extends RMStateStore { ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); - assert appState != null; + if (appState == null) { + throw new YarnRuntimeException("Application doesn't exist"); + } if (appState.attempts.containsKey(attemptState.getAttemptId())) { Exception e = new IOException("Attempt: " + @@ -125,7 +128,9 @@ public class MemoryRMStateStore extends RMStateStore { throws Exception { ApplicationId appId = appState.getAppId(); ApplicationState removed = state.appState.remove(appId); - assert removed != null; + if (removed == null) { + throw new YarnRuntimeException("Removing non-exsisting application state"); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 179b721bcdf..382ed97d61b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; @@ -482,12 +483,15 @@ public abstract class RMStateStore extends AbstractService { ApplicationState appState = ((RMStateStoreRemoveAppEvent) event).getAppState(); ApplicationId appId = appState.getAppId(); - + Exception removedException = null; LOG.info("Removing info for app: " + appId); try { removeApplicationState(appState); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); + removedException = e; + } finally { + notifyDoneRemovingApplcation(appId, removedException); } } break; @@ -521,7 +525,18 @@ public abstract class RMStateStore extends AbstractService { rmDispatcher.getEventHandler().handle( new RMAppAttemptStoredEvent(attemptId, storedException)); } - + + @SuppressWarnings("unchecked") + /** + * This is to notify RMApp that this application has been removed from + * RMStateStore + */ + private void notifyDoneRemovingApplcation(ApplicationId appId, + Exception removedException) { + rmDispatcher.getEventHandler().handle( + new RMAppRemovedEvent(appId, removedException)); + } + /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index f1c496a6c62..fadaa3b00e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -194,4 +195,20 @@ public interface RMApp extends EventHandler { * @return the application type. */ String getApplicationType(); + + /** + * Check whether this application is safe to unregister. + * An application is deemed to be safe to unregister if it is an unmanaged + * AM or its state has been removed from state store. + * @return the flag which indicates whether this application is safe to + * unregister. + */ + boolean isAppSafeToUnregister(); + + /** + * Create the external user-facing state of ApplicationMaster from the + * current state of the {@link RMApp}. + * @return the external user-facing state of ApplicationMaster. + */ + YarnApplicationState createApplicationState(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index d15e12e0014..e7dba63b904 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -27,11 +27,14 @@ public enum RMAppEventType { // Source: RMAppAttempt APP_REJECTED, APP_ACCEPTED, - APP_SAVED, ATTEMPT_REGISTERED, - ATTEMPT_FINISHING, + ATTEMPT_UNREGISTERED, ATTEMPT_FINISHED, // Will send the final state ATTEMPT_FAILED, ATTEMPT_KILLED, - NODE_UPDATE + NODE_UPDATE, + + // Source: RMStateStore + APP_SAVED, + APP_REMOVED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cbafffe04c8..b5922873b67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -109,6 +111,8 @@ public class RMAppImpl implements RMApp, Recoverable { private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); + private boolean isAppRemovalRequestSent = false; + private RMAppState previousStateAtRemoving; private static final StateMachineFactory) appAttemptTokens .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); - - // For now, no need to populate tokens back to AMRMTokenSecretManager, - // because running attempts are rebooted. Later in work-preserve restart, - // we'll create NEW->RUNNING transition in which the restored tokens will be - // added to the secret manager + rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken); } private static class BaseTransition implements @@ -1149,7 +1147,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId(); appAttempt.eventHandler.handle( - new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING)); + new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED)); return RMAppAttemptState.FINISHING; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java index 3f50e1b3cd1..5d21ec08885 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Timer; @@ -30,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -123,6 +125,19 @@ public class AMRMTokenSecretManager extends return password; } + /** + * Populate persisted password of AMRMToken back to AMRMTokenSecretManager. + */ + public synchronized void + addPersistedPassword(Token token) throws IOException { + AMRMTokenIdentifier identifier = token.decodeIdentifier(); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding password for " + identifier.getApplicationAttemptId()); + } + this.passwords.put(identifier.getApplicationAttemptId(), + token.getPassword()); + } + /** * Retrieve the password for the given {@link AMRMTokenIdentifier}. * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index fbf53267dff..112f0e373e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -333,10 +334,12 @@ public class TestRMRestart { // finish the AM's am1.unregisterAppAttempt(); + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.FINISHING); am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE); am1.waitForState(RMAppAttemptState.FINISHED); am2.unregisterAppAttempt(); + rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.FINISHING); am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE); am2.waitForState(RMAppAttemptState.FINISHED); @@ -577,14 +580,16 @@ public class TestRMRestart { attempt1.getClientTokenMasterKey(), loadedAttempt1.getClientTokenMasterKey()); - // assert secret manager also knows about the key + // assert ClientTokenSecretManager also knows about the key Assert.assertArrayEquals(clientTokenMasterKey, rm2.getClientToAMTokenSecretManager().getMasterKey(attemptId1) .getEncoded()); - // Not testing ApplicationTokenSecretManager has the password populated back, - // that is needed in work-preserving restart - + // assert AMRMTokenSecretManager also knows about the AMRMToken password + Token amrmToken = loadedAttempt1.getAMRMToken(); + Assert.assertArrayEquals(amrmToken.getPassword(), + rm2.getAMRMTokenSecretManager().retrievePassword( + amrmToken.decodeIdentifier())); rm1.stop(); rm2.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 93dbdc6cc7f..73b9cf75cd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -142,6 +143,16 @@ public abstract class MockAsm extends MockApps { public void setQueue(String name) { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public boolean isAppSafeToUnregister() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public YarnApplicationState createApplicationState() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 9c7a969919d..73dc8d33045 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.MockApps; @@ -215,6 +216,15 @@ public class MockRMApp implements RMApp { @Override public String getApplicationType() { return YarnConfiguration.DEFAULT_APPLICATION_TYPE; - }; + } + @Override + public boolean isAppSafeToUnregister() { + return true; + } + + @Override + public YarnApplicationState createApplicationState() { + return null; + }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2c19597de9b..7f03e1d1acc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import static org.mockito.Mockito.mock; import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.Arrays; @@ -59,8 +60,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -78,6 +80,7 @@ public class TestRMAppTransitions { YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS; private static int appId = 1; private DrainDispatcher rmDispatcher; + private RMStateStore store; // ignore all the RM application attempt events private static final class TestApplicationAttemptEventDispatcher implements @@ -171,7 +174,7 @@ public class TestRMAppTransitions { mock(ContainerAllocationExpirer.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); - RMStateStore store = mock(RMStateStore.class); + store = mock(RMStateStore.class); this.rmContext = new RMContextImpl(rmDispatcher, store, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, @@ -278,6 +281,10 @@ public class TestRMAppTransitions { (application.getFinishTime() >= application.getStartTime())); } + private void assertAppRemoved(RMApp application){ + verify(store).removeApplication(application); + } + private static void assertKilled(RMApp application) { assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); @@ -366,15 +373,27 @@ public class TestRMAppTransitions { return application; } + protected RMApp testCreateAppRemoving( + ApplicationSubmissionContext submissionContext) throws IOException { + RMApp application = testCreateAppRunning(submissionContext); + RMAppEvent finishingEvent = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_UNREGISTERED); + application.handle(finishingEvent); + assertAppState(RMAppState.REMOVING, application); + assertAppRemoved(application); + return application; + } + protected RMApp testCreateAppFinishing( ApplicationSubmissionContext submissionContext) throws IOException { // unmanaged AMs don't use the FINISHING state assert submissionContext == null || !submissionContext.getUnmanagedAM(); - RMApp application = testCreateAppRunning(submissionContext); - // RUNNING => FINISHING event RMAppEventType.ATTEMPT_FINISHING + RMApp application = testCreateAppRemoving(submissionContext); + // REMOVING => FINISHING event RMAppEventType.APP_REMOVED RMAppEvent finishingEvent = new RMAppEvent(application.getApplicationId(), - RMAppEventType.ATTEMPT_FINISHING); + RMAppEventType.APP_REMOVED); application.handle(finishingEvent); assertAppState(RMAppState.FINISHING, application); assertTimesAtFinish(application); @@ -634,6 +653,30 @@ public class TestRMAppTransitions { assertFailed(application, ".*Failing the application.*"); } + @Test + public void testAppRemovingFinished() throws IOException { + LOG.info("--- START: testAppRemovingFINISHED ---"); + RMApp application = testCreateAppRemoving(null); + // APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED + RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent( + application.getApplicationId(), null); + application.handle(finishedEvent); + rmDispatcher.await(); + assertAppState(RMAppState.FINISHED, application); + } + + @Test + public void testAppRemovingKilled() throws IOException { + LOG.info("--- START: testAppRemovingKilledD ---"); + RMApp application = testCreateAppRemoving(null); + // APP_REMOVING => KILLED event RMAppEventType.KILL + RMAppEvent event = + new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); + application.handle(event); + rmDispatcher.await(); + assertAppState(RMAppState.KILLED, application); + } + @Test public void testAppFinishingKill() throws IOException { LOG.info("--- START: testAppFinishedFinished ---");