diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index cf2ab286814..b761051ef9d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -446,6 +446,28 @@ public class NetworkTopology { return getNode(node.getNetworkLocation()); } + /** + * Given a string representation of a rack, return its children + * @param loc a path-like string representation of a rack + * @return a newly allocated list with all the node's children + */ + public List getDatanodesInRack(String loc) { + netlock.readLock().lock(); + try { + loc = NodeBase.normalize(loc); + if (!NodeBase.ROOT.equals(loc)) { + loc = loc.substring(1); + } + InnerNode rack = (InnerNode) clusterMap.getLoc(loc); + if (rack == null) { + return null; + } + return new ArrayList(rack.getChildren()); + } finally { + netlock.readLock().unlock(); + } + } + /** Remove a node * Update node counter and rack counter if necessary * @param node node to be removed; can be null diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4d77689d9e9..c7a8f2e8405 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -22,6 +22,9 @@ Trunk (Unreleased) Azure environments. (See breakdown of tasks below for subtasks and contributors) + HDFS-2576. Enhances the DistributedFileSystem's create API so that clients + can specify favored datanodes for a file's blocks. (ddas) + IMPROVEMENTS HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common. @@ -262,6 +265,9 @@ Trunk (Unreleased) HDFS-4757. Update FSDirectory#inodeMap when replacing an INodeDirectory while setting quota. (Jing Zhao via szetszwo) + HDFS-4761. When resetting FSDirectory, the inodeMap should also be reset. + (Jing Zhao via szetszwo) + BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS HDFS-4145. Merge hdfs cmd line scripts from branch-1-win. (David Lao, @@ -457,6 +463,10 @@ Release 2.0.5-beta - UNRELEASED HDFS-4346. Add SequentialNumber as a base class for INodeId and GenerationStamp. (szetszwo) + HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing + the datanode with the most recent heartbeat as the primary. (Varun Sharma + via szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 8a0a7bd243e..31d54eb0801 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1210,7 +1210,7 @@ public class DFSClient implements java.io.Closeable { ChecksumOpt checksumOpt) throws IOException { return create(src, permission, flag, true, - replication, blockSize, progress, buffersize, checksumOpt); + replication, blockSize, progress, buffersize, checksumOpt, null); } /** @@ -1244,6 +1244,29 @@ public class DFSClient implements java.io.Closeable { Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException { + return create(src, permission, flag, createParent, replication, blockSize, + progress, buffersize, checksumOpt, null); + } + + /** + * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, + * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is + * a hint to where the namenode should place the file blocks. + * The favored nodes hint is not persisted in HDFS. Hence it may be honored + * at the creation time only. HDFS could move the blocks during balancing or + * replication, to move the blocks from favored nodes. A value of null means + * no favored nodes for this create + */ + public DFSOutputStream create(String src, + FsPermission permission, + EnumSet flag, + boolean createParent, + short replication, + long blockSize, + Progressable progress, + int buffersize, + ChecksumOpt checksumOpt, + InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); @@ -1252,9 +1275,18 @@ public class DFSClient implements java.io.Closeable { if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } + String[] favoredNodeStrs = null; + if (favoredNodes != null) { + favoredNodeStrs = new String[favoredNodes.length]; + for (int i = 0; i < favoredNodes.length; i++) { + favoredNodeStrs[i] = + favoredNodes[i].getAddress().getHostAddress() + ":" + + favoredNodes[i].getPort(); + } + } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, - buffersize, dfsClientConf.createChecksum(checksumOpt)); + buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs); beginFileLease(src, result); return result; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 8e0c0c4f26c..2d0be1b2110 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -315,6 +315,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { return key; } }); + private String[] favoredNodes; volatile boolean hasError = false; volatile int errorIndex = -1; private BlockConstructionStage stage; // block construction stage @@ -391,7 +392,11 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { } } - + + private void setFavoredNodes(String[] favoredNodes) { + this.favoredNodes = favoredNodes; + } + /** * Initialize for data streaming */ @@ -1177,7 +1182,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, fileId); + block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, @@ -1318,7 +1323,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { /** Construct a new output stream for creating a file. */ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, - DataChecksum checksum) throws IOException { + DataChecksum checksum, String[] favoredNodes) throws IOException { this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); @@ -1326,12 +1331,15 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { checksum.getBytesPerChecksum()); streamer = new DataStreamer(); + if (favoredNodes != null && favoredNodes.length != 0) { + streamer.setFavoredNodes(favoredNodes); + } } static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, - DataChecksum checksum) throws IOException { + DataChecksum checksum, String[] favoredNodes) throws IOException { final HdfsFileStatus stat; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, @@ -1349,11 +1357,19 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { SnapshotAccessControlException.class); } final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, - flag, progress, checksum); + flag, progress, checksum, favoredNodes); out.start(); return out; } + static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, + FsPermission masked, EnumSet flag, boolean createParent, + short replication, long blockSize, Progressable progress, int buffersize, + DataChecksum checksum) throws IOException { + return newStreamForCreate(dfsClient, src, masked, flag, createParent, replication, + blockSize, progress, buffersize, checksum, null); + } + /** Construct a new output stream for append. */ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 72663680dc9..9da5566a91f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -268,6 +268,27 @@ public class DistributedFileSystem extends FileSystem { : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, blockSize, progress, null); } + + /** + * Same as + * {@link #create(Path, FsPermission, boolean, int, short, long, + * Progressable)} with the addition of favoredNodes that is a hint to + * where the namenode should place the file blocks. + * The favored nodes hint is not persisted in HDFS. Hence it may be honored + * at the creation time only. HDFS could move the blocks during balancing or + * replication, to move the blocks from favored nodes. A value of null means + * no favored nodes for this create + */ + public HdfsDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress, InetSocketAddress[] favoredNodes) throws IOException { + statistics.incrementWriteOps(1); + final DFSOutputStream out = dfs.create(getPathName(f), permission, + overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) + : EnumSet.of(CreateFlag.CREATE), + true, replication, blockSize, progress, bufferSize, null, favoredNodes); + return new HdfsDataOutputStream(out, statistics); + } @Override public HdfsDataOutputStream create(Path f, FsPermission permission, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 7fbb79090a5..32a3e461689 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -306,6 +306,8 @@ public interface ClientProtocol { * @param excludeNodes a list of nodes that should not be * allocated for the current block * @param fileId the id uniquely identifying a file + * @param favoredNodes the list of nodes where the client wants the blocks. + * Nodes are identified by either host name or address. * * @return LocatedBlock allocated block information. * @@ -320,7 +322,8 @@ public interface ClientProtocol { */ @Idempotent public LocatedBlock addBlock(String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId) + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 46f14605717..11d22716256 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -382,12 +382,15 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements try { List excl = req.getExcludeNodesList(); + List favor = req.getFavoredNodesList(); LocatedBlock result = server.addBlock( req.getSrc(), req.getClientName(), req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null, (excl == null || excl.size() == 0) ? null : PBHelper.convert(excl - .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId()); + .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(), + (favor == null || favor.size() == 0) ? null : favor + .toArray(new String[favor.size()])); return AddBlockResponseProto.newBuilder() .setBlock(PBHelper.convert(result)).build(); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index ce609a51d37..49ad0524471 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -312,7 +312,8 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public LocatedBlock addBlock(String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId) + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { @@ -322,6 +323,9 @@ public class ClientNamenodeProtocolTranslatorPB implements req.setPrevious(PBHelper.convert(previous)); if (excludeNodes != null) req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); + if (favoredNodes != null) { + req.addAllFavoredNodes(Arrays.asList(favoredNodes)); + } try { return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java index 36b3598b2c6..fc59acf10a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -41,7 +42,10 @@ public class BlockInfoUnderConstruction extends BlockInfo { */ private List replicas; - /** A data-node responsible for block recovery. */ + /** + * Index of the primary data node doing the recovery. Useful for log + * messages. + */ private int primaryNodeIndex = -1; /** @@ -62,6 +66,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { static class ReplicaUnderConstruction extends Block { private DatanodeDescriptor expectedLocation; private ReplicaState state; + private boolean chosenAsPrimary; ReplicaUnderConstruction(Block block, DatanodeDescriptor target, @@ -69,6 +74,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { super(block); this.expectedLocation = target; this.state = state; + this.chosenAsPrimary = false; } /** @@ -88,6 +94,13 @@ public class BlockInfoUnderConstruction extends BlockInfo { return state; } + /** + * Whether the replica was chosen for recovery. + */ + boolean getChosenAsPrimary() { + return chosenAsPrimary; + } + /** * Set replica state. */ @@ -95,6 +108,13 @@ public class BlockInfoUnderConstruction extends BlockInfo { state = s; } + /** + * Set whether this replica was chosen for recovery. + */ + void setChosenAsPrimary(boolean chosenAsPrimary) { + this.chosenAsPrimary = chosenAsPrimary; + } + /** * Is data-node the replica belongs to alive. */ @@ -237,19 +257,40 @@ public class BlockInfoUnderConstruction extends BlockInfo { + " BlockInfoUnderConstruction.initLeaseRecovery:" + " No blocks found, lease removed."); } - - int previous = primaryNodeIndex; - for(int i = 1; i <= replicas.size(); i++) { - int j = (previous + i)%replicas.size(); - if (replicas.get(j).isAlive()) { - primaryNodeIndex = j; - DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); - primary.addBlockToBeRecovered(this); - NameNode.blockStateChangeLog.info("BLOCK* " + this - + " recovery started, primary=" + primary); - return; + boolean allLiveReplicasTriedAsPrimary = true; + for (int i = 0; i < replicas.size(); i++) { + // Check if all replicas have been tried or not. + if (replicas.get(i).isAlive()) { + allLiveReplicasTriedAsPrimary = + (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary()); } } + if (allLiveReplicasTriedAsPrimary) { + // Just set all the replicas to be chosen whether they are alive or not. + for (int i = 0; i < replicas.size(); i++) { + replicas.get(i).setChosenAsPrimary(false); + } + } + long mostRecentLastUpdate = 0; + ReplicaUnderConstruction primary = null; + primaryNodeIndex = -1; + for(int i = 0; i < replicas.size(); i++) { + // Skip alive replicas which have been chosen for recovery. + if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { + continue; + } + if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) { + primary = replicas.get(i); + primaryNodeIndex = i; + mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate(); + } + } + if (primary != null) { + primary.getExpectedLocation().addBlockToBeRecovered(this); + primary.setChosenAsPrimary(true); + NameNode.blockStateChangeLog.info("BLOCK* " + this + + " recovery started, primary=" + primary); + } } void addReplicaIfNotPresent(DatanodeDescriptor dn, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 4bb46070c8e..8abde8053f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; @@ -1333,11 +1335,12 @@ public class BlockManager { public DatanodeDescriptor[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, final HashMap excludedNodes, - final long blocksize) throws IOException { - // choose targets for the new block to be allocated. + final long blocksize, List favoredNodes) throws IOException { + List favoredDatanodeDescriptors = + getDatanodeDescriptors(favoredNodes); final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src, - numOfReplicas, client, new ArrayList(), false, - excludedNodes, blocksize); + numOfReplicas, client, excludedNodes, blocksize, + favoredDatanodeDescriptors); if (targets.length < minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" @@ -1350,6 +1353,24 @@ public class BlockManager { return targets; } + /** + * Get list of datanode descriptors for given list of nodes. Nodes are + * hostaddress:port or just hostaddress. + */ + List getDatanodeDescriptors(List nodes) { + List datanodeDescriptors = null; + if (nodes != null) { + datanodeDescriptors = new ArrayList(nodes.size()); + for (int i = 0; i < nodes.size(); i++) { + DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i)); + if (node != null) { + datanodeDescriptors.add(node); + } + } + } + return datanodeDescriptors; + } + /** * Parse the data-nodes the block belongs to and choose one, * which will be the replication source. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 4243bcdc654..36a0b2a6c86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -118,6 +118,25 @@ public abstract class BlockPlacementPolicy { return chooseTarget(srcBC.getName(), numOfReplicas, writer, chosenNodes, false, excludedNodes, blocksize); } + + /** + * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, + * HashMap, long)} with added parameter {@code favoredDatanodes} + * @param favoredNodes datanodes that should be favored as targets. This + * is only a hint and due to cluster state, namenode may not be + * able to place the blocks on these datanodes. + */ + DatanodeDescriptor[] chooseTarget(String src, + int numOfReplicas, DatanodeDescriptor writer, + HashMap excludedNodes, + long blocksize, List favoredNodes) { + // This class does not provide the functionality of placing + // a block in favored datanodes. The implementations of this class + // are expected to provide this functionality + return chooseTarget(src, numOfReplicas, writer, + new ArrayList(numOfReplicas), false, excludedNodes, + blocksize); + } /** * Verify that the block is replicated on at least minRacks different racks diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 00742356521..61a61a0b5de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -125,6 +125,60 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { excludedNodes, blocksize); } + @Override + DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas, + DatanodeDescriptor writer, HashMap excludedNodes, + long blocksize, List favoredNodes) { + try { + if (favoredNodes == null || favoredNodes.size() == 0) { + // Favored nodes not specified, fall back to regular block placement. + return chooseTarget(src, numOfReplicas, writer, + new ArrayList(numOfReplicas), false, + excludedNodes, blocksize); + } + + HashMap favoriteAndExcludedNodes = excludedNodes == null ? + new HashMap() : new HashMap(excludedNodes); + + // Choose favored nodes + List results = new ArrayList(); + boolean avoidStaleNodes = stats != null + && stats.isAvoidingStaleDataNodesForWrite(); + for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) { + DatanodeDescriptor favoredNode = favoredNodes.get(i); + // Choose a single node which is local to favoredNode. + // 'results' is updated within chooseLocalNode + DatanodeDescriptor target = chooseLocalNode(favoredNode, + favoriteAndExcludedNodes, blocksize, + getMaxNodesPerRack(results, + numOfReplicas)[1], results, avoidStaleNodes); + if (target == null) { + LOG.warn("Could not find a target for file " + src + + " with favored node " + favoredNode); + continue; + } + favoriteAndExcludedNodes.put(target, target); + } + + if (results.size() < numOfReplicas) { + // Not enough favored nodes, choose other nodes. + numOfReplicas -= results.size(); + DatanodeDescriptor[] remainingTargets = + chooseTarget(src, numOfReplicas, writer, results, + false, favoriteAndExcludedNodes, blocksize); + for (int i = 0; i < remainingTargets.length; i++) { + results.add(remainingTargets[i]); + } + } + return results.toArray(new DatanodeDescriptor[results.size()]); + } catch (NotEnoughReplicasException nr) { + // Fall back to regular block placement disregarding favored nodes hint + return chooseTarget(src, numOfReplicas, writer, + new ArrayList(numOfReplicas), false, + excludedNodes, blocksize); + } + } + /** This is the implementation. */ DatanodeDescriptor[] chooseTarget(int numOfReplicas, DatanodeDescriptor writer, @@ -140,15 +194,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { excludedNodes = new HashMap(); } - int clusterSize = clusterMap.getNumOfLeaves(); - int totalNumOfReplicas = chosenNodes.size()+numOfReplicas; - if (totalNumOfReplicas > clusterSize) { - numOfReplicas -= (totalNumOfReplicas-clusterSize); - totalNumOfReplicas = clusterSize; - } - - int maxNodesPerRack = - (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; + int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas); + numOfReplicas = result[0]; + int maxNodesPerRack = result[1]; List results = new ArrayList(chosenNodes); @@ -174,6 +222,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return getPipeline((writer==null)?localNode:writer, results.toArray(new DatanodeDescriptor[results.size()])); } + + private int[] getMaxNodesPerRack(List chosenNodes, + int numOfReplicas) { + int clusterSize = clusterMap.getNumOfLeaves(); + int totalNumOfReplicas = chosenNodes.size()+numOfReplicas; + if (totalNumOfReplicas > clusterSize) { + numOfReplicas -= (totalNumOfReplicas-clusterSize); + totalNumOfReplicas = clusterSize; + } + int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; + return new int[] {numOfReplicas, maxNodesPerRack}; + } /* choose numOfReplicas from all data nodes */ private DatanodeDescriptor chooseTarget(int numOfReplicas, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 525bf602cd1..670bea82ec8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -213,7 +213,7 @@ public class DatanodeManager { " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + "It should be a positive non-zero float value, not greater than 1.0f."); } - + private static long getStaleIntervalFromConf(Configuration conf, long heartbeatExpireInterval) { long staleInterval = conf.getLong( @@ -326,6 +326,68 @@ public class DatanodeManager { return host2DatanodeMap.getDatanodeByHost(host); } + /** @return the datanode descriptor for the host. */ + public DatanodeDescriptor getDatanodeByXferAddr(String host, int xferPort) { + return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); + } + + /** + * Given datanode address or host name, returns the DatanodeDescriptor for the + * same, or if it doesn't find the datanode, it looks for a machine local and + * then rack local datanode, if a rack local datanode is not possible either, + * it returns the DatanodeDescriptor of any random node in the cluster. + * + * @param address hostaddress:transfer address + * @return the best match for the given datanode + * @throws IOException when no datanode is found for given address + */ + DatanodeDescriptor getDatanodeDescriptor(String address) { + DatanodeDescriptor node = null; + int colon = address.indexOf(":"); + int xferPort; + String host = address; + if (colon > 0) { + host = address.substring(0, colon); + xferPort = Integer.parseInt(address.substring(colon+1)); + node = getDatanodeByXferAddr(host, xferPort); + } + if (node == null) { + node = getDatanodeByHost(host); + } + if (node == null) { + String networkLocation = resolveNetworkLocation(host); + + // If the current cluster doesn't contain the node, fallback to + // something machine local and then rack local. + List rackNodes = getNetworkTopology() + .getDatanodesInRack(networkLocation); + if (rackNodes != null) { + // Try something machine local. + for (Node rackNode : rackNodes) { + if (((DatanodeDescriptor) rackNode).getIpAddr().equals(host)) { + node = (DatanodeDescriptor) rackNode; + break; + } + } + + // Try something rack local. + if (node == null && !rackNodes.isEmpty()) { + node = (DatanodeDescriptor) (rackNodes + .get(DFSUtil.getRandom().nextInt(rackNodes.size()))); + } + } + + // If we can't even choose rack local, just choose any node in the + // cluster. + if (node == null) { + node = (DatanodeDescriptor)getNetworkTopology() + .chooseRandom(NodeBase.ROOT); + } + } + return node; + } + + /** Get a datanode descriptor given corresponding storageID */ DatanodeDescriptor getDatanode(final String storageID) { return datanodeMap.get(storageID); @@ -455,8 +517,13 @@ public class DatanodeManager { } } + public String resolveNetworkLocation(String host) { + DatanodeID d = parseDNFromHostsEntry(host); + return resolveNetworkLocation(d); + } + /* Resolve a node's network location */ - private void resolveNetworkLocation (DatanodeDescriptor node) { + private String resolveNetworkLocation (DatanodeID node) { List names = new ArrayList(1); if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { names.add(node.getIpAddr()); @@ -474,7 +541,7 @@ public class DatanodeManager { } else { networkLocation = rName.get(0); } - node.setNetworkLocation(networkLocation); + return networkLocation; } private boolean inHostsList(DatanodeID node) { @@ -707,7 +774,7 @@ public class DatanodeManager { nodeS.setDisallowed(false); // Node is in the include list // resolve network location - resolveNetworkLocation(nodeS); + nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); getNetworkTopology().add(nodeS); // also treat the registration message as a heartbeat @@ -739,7 +806,7 @@ public class DatanodeManager { = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); boolean success = false; try { - resolveNetworkLocation(nodeDescr); + nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); networktopology.add(nodeDescr); // register new datanode @@ -875,7 +942,7 @@ public class DatanodeManager { (numStaleNodes <= heartbeatManager.getLiveDatanodeCount() * ratioUseStaleDataNodesForWrite); } - + /** * @return The time interval used to mark DataNodes as stale. */ @@ -1093,7 +1160,7 @@ public class DatanodeManager { * failed. As a special case, the loopback address is also considered * acceptable. This is particularly important on Windows, where 127.0.0.1 does * not resolve to "localhost". - * + * * @param address InetAddress to check * @return boolean true if name resolution successful or address is loopback */ @@ -1127,7 +1194,7 @@ public class DatanodeManager { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } - + if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } @@ -1142,9 +1209,34 @@ public class DatanodeManager { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); for (BlockInfoUnderConstruction b : blocks) { - brCommand.add(new RecoveringBlock( - new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b - .getBlockRecoveryId())); + DatanodeDescriptor[] expectedLocations = b.getExpectedLocations(); + // Skip stale nodes during recovery - not heart beated for some time (30s by default). + List recoveryLocations = + new ArrayList(expectedLocations.length); + for (int i = 0; i < expectedLocations.length; i++) { + if (!expectedLocations[i].isStale(this.staleInterval)) { + recoveryLocations.add(expectedLocations[i]); + } + } + // If we only get 1 replica after eliminating stale nodes, then choose all + // replicas for recovery and let the primary data node handle failures. + if (recoveryLocations.size() > 1) { + if (recoveryLocations.size() != expectedLocations.length) { + LOG.info("Skipped stale nodes for recovery : " + + (expectedLocations.length - recoveryLocations.size())); + } + brCommand.add(new RecoveringBlock( + new ExtendedBlock(blockPoolId, b), + recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]), + b.getBlockRecoveryId())); + } else { + // If too many replicas are stale, then choose all replicas to participate + // in block recovery. + brCommand.add(new RecoveringBlock( + new ExtendedBlock(blockPoolId, b), + expectedLocations, + b.getBlockRecoveryId())); + } } return new DatanodeCommand[] { brCommand }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index b3129c55045..53d19bdc5ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -2452,6 +2452,8 @@ public class FSDirectory implements Closeable { try { setReady(false); rootDir = createRoot(getFSNamesystem()); + inodeMap.clear(); + addToInodeMapUnprotected(rootDir); nameCache.reset(); } finally { writeUnlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 47f74a3c006..9e6a9ae27ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2228,7 +2228,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * client to "try again later". */ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, - ExtendedBlock previous, HashMap excludedNodes) + ExtendedBlock previous, HashMap excludedNodes, + List favoredNodes) throws LeaseExpiredException, NotReplicatedYetException, QuotaExceededException, SafeModeException, UnresolvedLinkException, IOException { @@ -2268,8 +2269,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } // choose targets for the new block to be allocated. - final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( - src, replication, clientNode, excludedNodes, blockSize); + final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( + src, replication, clientNode, excludedNodes, blockSize, favoredNodes); // Part II. // Allocate a new block, add it to the INode and the BlocksMap. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index f209699a17b..fb988720bbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -29,6 +29,7 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; @@ -484,7 +485,8 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override public LocatedBlock addBlock(String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId) + ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, + String[] favoredNodes) throws IOException { if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src @@ -497,8 +499,10 @@ class NameNodeRpcServer implements NamenodeProtocols { excludedNodesSet.put(node, node); } } + List favoredNodesList = (favoredNodes == null) ? null + : Arrays.asList(favoredNodes); LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, - clientName, previous, excludedNodesSet); + clientName, previous, excludedNodesSet, favoredNodesList); if (locatedBlock != null) metrics.incrAddBlockOps(); return locatedBlock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 3044cbbab30..869c8136071 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -121,6 +121,7 @@ message AddBlockRequestProto { optional ExtendedBlockProto previous = 3; repeated DatanodeInfoProto excludeNodes = 4; optional uint64 fileId = 5 [default = 0]; // default as a bogus id + repeated string favoredNodes = 6; //the set of datanodes to use for the block } message AddBlockResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 30f2bc9dd0d..f896c335f10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1072,7 +1072,10 @@ otherwise this may cause too frequent change of stale states. We thus set a minimum stale interval value (the default value is 3 times of heartbeat interval) and guarantee that the stale interval cannot be less - than the minimum value. + than the minimum value. A stale data node is avoided during lease/block + recovery. It can be conditionally avoided for reads (see + dfs.namenode.avoid.read.stale.datanode) and for writes (see + dfs.namenode.avoid.write.stale.datanode). diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index a7292e2c8e4..277edef0df8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -241,7 +241,7 @@ public class TestDFSClientRetries { anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class), - anyLong())).thenAnswer(answer); + anyLong(), any(String[].class))).thenAnswer(answer); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( @@ -390,7 +390,7 @@ public class TestDFSClientRetries { } }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(), Mockito. any(), Mockito. any(), - Mockito.anyLong()); + Mockito.anyLong(), Mockito. any()); doAnswer(new Answer() { @@ -432,7 +432,7 @@ public class TestDFSClientRetries { Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock( Mockito.anyString(), Mockito.anyString(), Mockito. any(), Mockito. any(), - Mockito.anyLong()); + Mockito.anyLong(), Mockito. any()); Mockito.verify(spyNN, Mockito.atLeastOnce()).complete( Mockito.anyString(), Mockito.anyString(), Mockito.any()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 5a01ea57a8f..66cb90fed81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -519,7 +519,7 @@ public class TestFileCreation { // add one block to the file LocatedBlock location = client.getNamenode().addBlock(file1.toString(), - client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID); + client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID, null); System.out.println("testFileCreationError2: " + "Added block " + location.getBlock()); @@ -570,7 +570,7 @@ public class TestFileCreation { createFile(dfs, f, 3); try { cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName, - null, null, INodeId.GRANDFATHER_INODE_ID); + null, null, INodeId.GRANDFATHER_INODE_ID, null); fail(); } catch(IOException ioe) { FileSystem.LOG.info("GOOD!", ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java new file mode 100644 index 00000000000..cafc8227147 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.junit.Test; + +/** + * This class provides tests for BlockInfoUnderConstruction class + */ +public class TestBlockInfoUnderConstruction { + @Test + public void testInitializeBlockRecovery() throws Exception { + DatanodeDescriptor dd1 = DFSTestUtil.getDatanodeDescriptor("10.10.1.1", + "default"); + DatanodeDescriptor dd2 = DFSTestUtil.getDatanodeDescriptor("10.10.1.2", + "default"); + DatanodeDescriptor dd3 = DFSTestUtil.getDatanodeDescriptor("10.10.1.3", + "default"); + dd1.isAlive = dd2.isAlive = dd3.isAlive = true; + BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction( + new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), + 3, + BlockUCState.UNDER_CONSTRUCTION, + new DatanodeDescriptor[] {dd1, dd2, dd3}); + + // Recovery attempt #1. + long currentTime = System.currentTimeMillis(); + dd1.setLastUpdate(currentTime - 3 * 1000); + dd2.setLastUpdate(currentTime - 1 * 1000); + dd3.setLastUpdate(currentTime - 2 * 1000); + blockInfo.initializeBlockRecovery(1); + BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1); + assertEquals(blockInfoRecovery[0], blockInfo); + + // Recovery attempt #2. + currentTime = System.currentTimeMillis(); + dd1.setLastUpdate(currentTime - 2 * 1000); + dd2.setLastUpdate(currentTime - 1 * 1000); + dd3.setLastUpdate(currentTime - 3 * 1000); + blockInfo.initializeBlockRecovery(2); + blockInfoRecovery = dd1.getLeaseRecoveryCommand(1); + assertEquals(blockInfoRecovery[0], blockInfo); + + // Recovery attempt #3. + currentTime = System.currentTimeMillis(); + dd1.setLastUpdate(currentTime - 2 * 1000); + dd2.setLastUpdate(currentTime - 1 * 1000); + dd3.setLastUpdate(currentTime - 3 * 1000); + currentTime = System.currentTimeMillis(); + blockInfo.initializeBlockRecovery(3); + blockInfoRecovery = dd3.getLeaseRecoveryCommand(1); + assertEquals(blockInfoRecovery[0], blockInfo); + + // Recovery attempt #4. + // Reset everything. And again pick DN with most recent heart beat. + currentTime = System.currentTimeMillis(); + dd1.setLastUpdate(currentTime - 2 * 1000); + dd2.setLastUpdate(currentTime - 1 * 1000); + dd3.setLastUpdate(currentTime); + currentTime = System.currentTimeMillis(); + blockInfo.initializeBlockRecovery(3); + blockInfoRecovery = dd3.getLeaseRecoveryCommand(1); + assertEquals(blockInfoRecovery[0], blockInfo); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index e4038676a16..bbb83070a30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -20,17 +20,21 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -56,14 +60,12 @@ public class TestHeartbeatHandling { final HeartbeatManager hm = namesystem.getBlockManager( ).getDatanodeManager().getHeartbeatManager(); final String poolId = namesystem.getBlockPoolId(); - final DatanodeRegistration nodeReg = + final DatanodeRegistration nodeReg = DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId); - - final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg); - + final int REMAINING_BLOCKS = 1; - final int MAX_REPLICATE_LIMIT = + final int MAX_REPLICATE_LIMIT = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2); final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT; final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS; @@ -83,7 +85,7 @@ public class TestHeartbeatHandling { assertEquals(1, cmds.length); assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction()); assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length); - + ArrayList blockList = new ArrayList(MAX_INVALIDATE_BLOCKS); for (int i=0; iany(), Mockito.>any(), - Mockito.anyLong()); + Mockito.anyLong(), Mockito.>any()); // create file nn.create(src, FsPermission.getFileDefault(), @@ -129,7 +130,7 @@ public class TestAddBlockRetry { // start first addBlock() LOG.info("Starting first addBlock for " + src); - nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID); + nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); // check locations LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java index 3ed98556a5e..d84283fa7d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java @@ -77,6 +77,7 @@ import org.apache.hadoop.util.ExitUtil.ExitException; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; @@ -1061,6 +1062,10 @@ public class TestCheckpoint { secondary = startSecondaryNameNode(conf); secondary.doCheckpoint(); + FSDirectory secondaryFsDir = secondary.getFSNamesystem().dir; + INode rootInMap = secondaryFsDir.getInode(secondaryFsDir.rootDir.getId()); + Assert.assertSame(rootInMap, secondaryFsDir.rootDir); + fileSys.delete(tmpDir, true); fileSys.mkdirs(tmpDir); secondary.doCheckpoint(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java new file mode 100644 index 00000000000..015c021b066 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java @@ -0,0 +1,220 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Random; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.Test; +import org.junit.AfterClass; +import org.junit.BeforeClass; + + +public class TestFavoredNodesEndToEnd { + private static MiniDFSCluster cluster; + private static Configuration conf; + private final static int NUM_DATA_NODES = 10; + private final static int NUM_FILES = 10; + private final static byte[] SOME_BYTES = new String("foo").getBytes(); + private static DistributedFileSystem dfs; + private static ArrayList datanodes; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) + .build(); + cluster.waitClusterUp(); + dfs = cluster.getFileSystem(); + datanodes = cluster.getDataNodes(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testFavoredNodesEndToEnd() throws Exception { + //create 10 files with random preferred nodes + for (int i = 0; i < NUM_FILES; i++) { + Random rand = new Random(System.currentTimeMillis() + i); + //pass a new created rand so as to get a uniform distribution each time + //without too much collisions (look at the do-while loop in getDatanodes) + InetSocketAddress datanode[] = getDatanodes(rand); + Path p = new Path("/filename"+i); + FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, + 4096, (short)3, (long)4096, null, datanode); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = + dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, + Long.MAX_VALUE); + //make sure we have exactly one block location, and three hosts + assertTrue(locations.length == 1 && locations[0].getHosts().length == 3); + //verify the files got created in the right nodes + for (BlockLocation loc : locations) { + String[] hosts = loc.getNames(); + String[] hosts1 = getStringForInetSocketAddrs(datanode); + assertTrue(compareNodes(hosts, hosts1)); + } + } + } + + @Test + public void testWhenFavoredNodesNotPresent() throws Exception { + //when we ask for favored nodes but the nodes are not there, we should + //get some other nodes. In other words, the write to hdfs should not fail + //and if we do getBlockLocations on the file, we should see one blklocation + //and three hosts for that + Random rand = new Random(System.currentTimeMillis()); + InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3]; + for (int i = 0; i < 3; i++) { + arbitraryAddrs[i] = getArbitraryLocalHostAddr(); + } + Path p = new Path("/filename-foo-bar"); + FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, + 4096, (short)3, (long)4096, null, arbitraryAddrs); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = + dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, + Long.MAX_VALUE); + assertTrue(locations.length == 1 && locations[0].getHosts().length == 3); + } + + @Test + public void testWhenSomeNodesAreNotGood() throws Exception { + //make some datanode not "good" so that even if the client prefers it, + //the namenode would not give it as a replica to write to + DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getDatanodeByXferAddr( + datanodes.get(0).getXferAddress().getAddress().getHostAddress(), + datanodes.get(0).getXferAddress().getPort()); + //set the decommission status to true so that + //BlockPlacementPolicyDefault.isGoodTarget returns false for this dn + d.setDecommissioned(); + InetSocketAddress addrs[] = new InetSocketAddress[3]; + for (int i = 0; i < 3; i++) { + addrs[i] = datanodes.get(i).getXferAddress(); + } + Path p = new Path("/filename-foo-bar-baz"); + FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, + 4096, (short)3, (long)4096, null, addrs); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = + dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, + Long.MAX_VALUE); + //reset the state + d.stopDecommission(); + assertTrue(locations.length == 1 && locations[0].getHosts().length == 3); + //also make sure that the datanode[0] is not in the list of hosts + String datanode0 = + datanodes.get(0).getXferAddress().getAddress().getHostAddress() + + ":" + datanodes.get(0).getXferAddress().getPort(); + for (int i = 0; i < 3; i++) { + if (locations[0].getNames()[i].equals(datanode0)) { + fail(datanode0 + " not supposed to be a replica for the block"); + } + } + } + + private String[] getStringForInetSocketAddrs(InetSocketAddress[] datanode) { + String strs[] = new String[datanode.length]; + for (int i = 0; i < datanode.length; i++) { + strs[i] = datanode[i].getAddress().getHostAddress() + ":" + + datanode[i].getPort(); + } + return strs; + } + + private boolean compareNodes(String[] dnList1, String[] dnList2) { + for (int i = 0; i < dnList1.length; i++) { + boolean matched = false; + for (int j = 0; j < dnList2.length; j++) { + if (dnList1[i].equals(dnList2[j])) { + matched = true; + break; + } + } + if (matched == false) { + fail(dnList1[i] + " not a favored node"); + } + } + return true; + } + + private InetSocketAddress[] getDatanodes(Random rand) { + //Get some unique random indexes + int idx1 = rand.nextInt(NUM_DATA_NODES); + int idx2; + + do { + idx2 = rand.nextInt(NUM_DATA_NODES); + } while (idx1 == idx2); + + int idx3; + do { + idx3 = rand.nextInt(NUM_DATA_NODES); + } while (idx2 == idx3 || idx1 == idx3); + + InetSocketAddress[] addrs = new InetSocketAddress[3]; + addrs[0] = datanodes.get(idx1).getXferAddress(); + addrs[1] = datanodes.get(idx2).getXferAddress(); + addrs[2] = datanodes.get(idx3).getXferAddress(); + return addrs; + } + + private InetSocketAddress getArbitraryLocalHostAddr() + throws UnknownHostException{ + Random rand = new Random(System.currentTimeMillis()); + int port = rand.nextInt(65535); + while (true) { + boolean conflict = false; + for (DataNode d : datanodes) { + if (d.getXferAddress().getPort() == port) { + port = rand.nextInt(65535); + conflict = true; + } + } + if (conflict == false) { + break; + } + } + return new InetSocketAddress(InetAddress.getLocalHost(), port); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java index bfac1afcaaa..3c8b91be8d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -521,9 +521,17 @@ public class TestPipelinesFailover { storedBlock instanceof BlockInfoUnderConstruction); BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)storedBlock; - // We expect that the first indexed replica will be the one - // to be in charge of the synchronization / recovery protocol. - DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0]; + // We expect that the replica with the most recent heart beat will be + // the one to be in charge of the synchronization / recovery protocol. + DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations(); + DatanodeDescriptor expectedPrimary = datanodes[0]; + long mostRecentLastUpdate = expectedPrimary.getLastUpdate(); + for (int i = 1; i < datanodes.length; i++) { + if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) { + expectedPrimary = datanodes[i]; + mostRecentLastUpdate = expectedPrimary.getLastUpdate(); + } + } return expectedPrimary; } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e18cd9fb5f9..99582138758 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -356,6 +356,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after YARN-577. (Hitesh Shah via vinodkv) + MAPREDUCE-5167. Update MR App after YARN-562 to use the new builder API + for the container. (Jian He via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index fae70742107..60facec1284 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1094,12 +1094,12 @@ public abstract class TaskAttemptImpl implements + taInfo.getPort()); String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":" + taInfo.getHttpPort()); - // Resource/Priority/Tokens are only needed while launching the - // container on an NM, these are already completed tasks, so setting them to - // null + // Resource/Priority/Tokens and RMIdentifier are only needed while + // launching the container on an NM, these are already completed tasks, so + // setting them to null and RMIdentifier as 0 container = BuilderUtils.newContainer(containerId, containerNodeId, - nodeHttpAddress, null, null, null); + nodeHttpAddress, null, null, null, 0); computeRackAndLocality(); launchTime = taInfo.getStartTime(); finishTime = (taInfo.getFinishTime() != -1) ? diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 4ef4d8d9f4b..c187bc8d0b7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -519,7 +519,7 @@ public class MRApp extends MRAppMaster { cId.setId(containerCount++); NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT); Container container = BuilderUtils.newContainer(cId, nodeId, - NM_HOST + ":" + NM_HTTP_PORT, null, null, null); + NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0); JobID id = TypeConverter.fromYarn(applicationId); JobId jobId = TypeConverter.toYarn(id); getContext().getEventHandler().handle(new JobHistoryEvent(jobId, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java index efb8b7a134b..380df645ea7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java @@ -243,7 +243,7 @@ public class MRAppBenchmark { .newContainer(containerId, BuilderUtils.newNodeId("host" + containerId.getId(), 2345), "host" + containerId.getId() + ":5678", req - .getCapability(), req.getPriority(), null)); + .getCapability(), req.getPriority(), null, 0)); } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1e260185247..0703d7abe9e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -187,6 +187,12 @@ Release 2.0.5-beta - UNRELEASED YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza via tomwhite) + YARN-562. Modified NM to reject any containers allocated by a previous + ResourceManager. (Jian He via vinodkv) + + YARN-591. Moved RM recovery related records out of public API as they do not + belong there. (vinodkv) + OPTIMIZATIONS BUG FIXES @@ -596,6 +602,9 @@ Release 0.23.8 - UNRELEASED BUG FIXES + YARN-363. Add webapps/proxy directory without which YARN proxy-server fails + when started in stand-alone mode. (Kenji Kikushima via vinodkv) + Release 0.23.7 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 9478d341216..b0860e5aa19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -135,4 +135,16 @@ public interface Container extends Comparable { @Private @Unstable void setContainerToken(ContainerToken containerToken); + + /** + * Get the RMIdentifier of RM in which containers are allocated + * @return RMIdentifier + */ + @Private + @Unstable + long getRMIdentifer(); + + @Private + @Unstable + void setRMIdentifier(long rmIdentifier); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index dd6941ff79f..68bb0839a9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -230,6 +230,18 @@ public class ContainerPBImpl extends ProtoBase implements Contai this.containerToken = containerToken; } + @Override + public long getRMIdentifer() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + return p.getRmIdentifier(); + } + + @Override + public void setRMIdentifier(long rmIdentifier) { + maybeInitBuilder(); + builder.setRmIdentifier((rmIdentifier)); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index a84bf37e39e..212f0ec82d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -68,6 +68,7 @@ message ContainerProto { optional ResourceProto resource = 4; optional PriorityProto priority = 5; optional hadoop.common.TokenProto container_token = 6; + optional int64 rm_identifier = 7; } enum YarnApplicationStateProto { @@ -311,16 +312,3 @@ message StringBytesMapProto { optional string key = 1; optional bytes value = 2; } - -//////////////////////////////////////////////////////////////////////// -////// From recovery//////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////// -message ApplicationStateDataProto { - optional int64 submit_time = 1; - optional ApplicationSubmissionContextProto application_submission_context = 2; -} - -message ApplicationAttemptStateDataProto { - optional ApplicationAttemptIdProto attemptId = 1; - optional ContainerProto master_container = 2; -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto index 942340b38d0..52e21d7fe5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto @@ -21,6 +21,7 @@ option java_outer_classname = "YarnServerResourceManagerServiceProtos"; option java_generic_services = true; option java_generate_equals_and_hash = true; +import "yarn_protos.proto"; message RefreshQueuesRequestProto { } @@ -59,3 +60,16 @@ message GetGroupsForUserRequestProto { message GetGroupsForUserResponseProto { repeated string groups = 1; } + +//////////////////////////////////////////////////////////////////////// +////// RM recovery related records ///////////////////////////////////// +//////////////////////////////////////////////////////////////////////// +message ApplicationStateDataProto { + optional int64 submit_time = 1; + optional ApplicationSubmissionContextProto application_submission_context = 2; +} + +message ApplicationAttemptStateDataProto { + optional ApplicationAttemptIdProto attemptId = 1; + optional ContainerProto master_container = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java index bbb8ad1463f..d95ce64f630 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java @@ -56,7 +56,7 @@ public class TestAMRMClientAsync { BuilderUtils.newContainerId(0, 0, 0, 0), ContainerState.COMPLETE, "", 0)); List allocated1 = Arrays.asList( - BuilderUtils.newContainer(null, null, null, null, null, null)); + BuilderUtils.newContainer(null, null, null, null, null, null, 0)); final AllocateResponse response1 = createAllocateResponse( new ArrayList(), allocated1); final AllocateResponse response2 = createAllocateResponse(completed1, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 7dc25de8208..f09046e3712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -237,9 +237,9 @@ public class BuilderUtils { return containerStatus; } - public static Container newContainer(ContainerId containerId, - NodeId nodeId, String nodeHttpAddress, - Resource resource, Priority priority, ContainerToken containerToken) { + public static Container newContainer(ContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + ContainerToken containerToken, long rmIdentifier) { Container container = recordFactory.newRecordInstance(Container.class); container.setId(containerId); container.setNodeId(nodeId); @@ -247,6 +247,7 @@ public class BuilderUtils { container.setResource(resource); container.setPriority(priority); container.setContainerToken(containerToken); + container.setRMIdentifier(rmIdentifier); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 295a38cee80..7454955bbc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -105,7 +105,7 @@ public class TestContainerLaunchRPC { containerId.setId(100); Container container = BuilderUtils.newContainer(containerId, null, null, recordFactory - .newRecordInstance(Resource.class), null, null); + .newRecordInstance(Resource.class), null, null, 0); StartContainerRequest scRequest = recordFactory .newRecordInstance(StartContainerRequest.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 7d941e92a23..92bbb8dc4bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -128,7 +128,7 @@ public class TestRPC { containerId.setId(100); Container mockContainer = BuilderUtils.newContainer(containerId, null, null, recordFactory - .newRecordInstance(Resource.class), null, null); + .newRecordInstance(Resource.class), null, null, 0); // containerLaunchContext.env = new HashMap(); // containerLaunchContext.command = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java new file mode 100644 index 00000000000..3842574a2c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +public interface ResourceManagerConstants { + + public static final long RM_INVALID_IDENTIFIER = 0; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index 1c1a9dd4a6a..11b02115c0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -30,4 +30,7 @@ public interface RegisterNodeManagerResponse { void setNodeAction(NodeAction nodeAction); + long getRMIdentifier(); + + void setRMIdentifier(long rmIdentifier); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index c28d4c96103..43451dc6793 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -121,6 +121,18 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase(); private final NMContainerTokenSecretManager containerTokenSecretManager; - + private ContainerManager containerManager; private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); @@ -333,6 +338,15 @@ public class NodeManager extends CompositeService public NodeHealthStatus getNodeHealthStatus() { return this.nodeHealthStatus; } + + @Override + public ContainerManager getContainerManager() { + return this.containerManager; + } + + public void setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + } } @@ -376,7 +390,7 @@ public class NodeManager extends CompositeService stop(); break; case RESYNC: - cleanupContainersOnResync(); + resyncWithRM(); break; default: LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index 41949e7baab..c9577714e27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -24,5 +24,8 @@ import org.apache.hadoop.yarn.service.Service; public interface NodeStatusUpdater extends Service { void sendOutofBandHeartBeat(); + NodeStatus getNodeStatusAndUpdateContainersInContext(); + + long getRMIdentifier(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index e9583c2a2e9..284cd946005 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.service.AbstractService; @@ -95,6 +97,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private Runnable statusUpdaterRunnable; private Thread statusUpdater; + private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -267,6 +270,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.resourceTracker = getRMClient(); regNMResponse = this.resourceTracker.registerNodeManager(request); + this.rmIdentifier = regNMResponse.getRMIdentifier(); break; } catch(Throwable e) { LOG.warn("Trying to connect to ResourceManager, " + @@ -308,7 +312,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Registered with ResourceManager as " + this.nodeId + " with total resource of " + this.totalResource); - + LOG.info("Notifying ContainerManager to unblock new container-requests"); + ((ContainerManagerImpl) this.context.getContainerManager()) + .setBlockNewContainerRequests(false); } private List createKeepAliveApplicationList() { @@ -334,6 +340,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return appList; } + @Override public NodeStatus getNodeStatusAndUpdateContainersInContext() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); @@ -407,6 +414,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + @Override + public long getRMIdentifier() { + return this.rmIdentifier; + } + protected void startStatusUpdater() { statusUpdaterRunnable = new Runnable() { @@ -478,6 +490,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (response.getNodeAction() == NodeAction.RESYNC) { LOG.info("Node is out of sync with ResourceManager," + " hence rebooting."); + // Invalidate the RMIdentifier while resync + NodeStatusUpdaterImpl.this.rmIdentifier = + ResourceManagerConstants.RM_INVALID_IDENTIFIER; dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e4ec8fe55e6..c79d7c94df6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -23,10 +23,9 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -125,6 +124,7 @@ public class ContainerManagerImpl extends CompositeService implements private final ApplicationACLsManager aclsManager; private final DeletionService deletionService; + private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false); public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -239,7 +239,10 @@ public class ContainerManagerImpl extends CompositeService implements false)) { refreshServiceAcls(conf, new NMPolicyProvider()); } - + + LOG.info("Blocking new container-requests as container manager rpc" + + " server is still starting."); + this.setBlockNewContainerRequests(true); server.start(); InetSocketAddress connectAddress = NetUtils.getConnectAddress(server); this.context.getNodeId().setHost(connectAddress.getHostName()); @@ -393,6 +396,13 @@ public class ContainerManagerImpl extends CompositeService implements @Override public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException { + + if (blockNewContainerRequests.get()) { + throw RPCUtil.getRemoteException(new NMNotYetReadyException( + "Rejecting new containers as NodeManager has not" + + " yet connected with ResourceManager")); + } + ContainerLaunchContext launchContext = request.getContainerLaunchContext(); org.apache.hadoop.yarn.api.records.Container lauchContainer = request.getContainer(); @@ -402,6 +412,16 @@ public class ContainerManagerImpl extends CompositeService implements UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr); authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi); + // Is the container coming from unknown RM + if (lauchContainer.getRMIdentifer() != nodeStatusUpdater + .getRMIdentifier()) { + String msg = "\nContainer "+ containerIDStr + + " rejected as it is allocated by a previous RM"; + LOG.error(msg); + throw RPCUtil + .getRemoteException(new InvalidContainerException(msg)); + } + LOG.info("Start request for " + containerIDStr + " by user " + launchContext.getUser()); @@ -615,6 +635,10 @@ public class ContainerManagerImpl extends CompositeService implements } } + public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { + this.blockNewContainerRequests.set(blockNewContainerRequests); + } + @Override public void stateChanged(Service service) { // TODO Auto-generated method stub diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java new file mode 100644 index 00000000000..87f1cae243a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.yarn.YarnException; + +/** + * This Exception happens when NM is rejecting container requests from RM + */ +public class InvalidContainerException extends YarnException { + + private static final long serialVersionUID = 1L; + + public InvalidContainerException(String msg) { + super(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java new file mode 100644 index 00000000000..a47f68120b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.yarn.YarnException; + +/** + * This exception happens when NM starts from scratch but has not yet connected + * with RM. + */ +public class NMNotYetReadyException extends YarnException { + + private static final long serialVersionUID = 1L; + + public NMNotYetReadyException(String msg) { + super(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 3284634c62c..3f74c29e18c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -168,4 +168,9 @@ public class DummyContainerManager extends ContainerManagerImpl { } }; } + + @Override + public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { + // do nothing + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index 44328dbe0aa..a23f125a569 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -142,6 +142,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager { super.testLocalFilesCleanup(); } + @Override + public void testContainerLaunchFromPreviousRM() throws InterruptedException, + IOException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerLaunchFromPreviousRM"); + super.testContainerLaunchFromPreviousRM(); + } private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java new file mode 100644 index 00000000000..76ac4420c74 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -0,0 +1,315 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Assert; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.NMNotYetReadyException; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestNodeManagerResync { + static final File basedir = + new File("target", TestNodeManagerResync.class.getName()); + static final File tmpDir = new File(basedir, "tmpDir"); + static final File logsDir = new File(basedir, "logs"); + static final File remoteLogsDir = new File(basedir, "remotelogs"); + static final File nmLocalDir = new File(basedir, "nm0"); + static final File processStartFile = new File(tmpDir, "start_file.txt") + .getAbsoluteFile(); + + static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + static final String user = "nobody"; + private FileContext localFS; + private CyclicBarrier syncBarrier; + private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); + + @Before + public void setup() throws UnsupportedFileSystemException { + localFS = FileContext.getLocalFSFileContext(); + tmpDir.mkdirs(); + logsDir.mkdirs(); + remoteLogsDir.mkdirs(); + nmLocalDir.mkdirs(); + syncBarrier = new CyclicBarrier(2); + } + + @After + public void tearDown() throws IOException, InterruptedException { + localFS.delete(new Path(basedir.getPath()), true); + assertionFailedInThread.set(false); + } + + @SuppressWarnings("unchecked") + @Test + public void testKillContainersOnResync() throws IOException, + InterruptedException { + NodeManager nm = new TestNodeManager1(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); + + Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount()); + nm.getNMDispatcher().getEventHandler(). + handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount()); + + Assert.assertFalse(assertionFailedInThread.get()); + + nm.stop(); + } + + // This test tests new container requests are blocked when NM starts from + // scratch until it register with RM AND while NM is resyncing with RM + @SuppressWarnings("unchecked") + @Test + public void testBlockNewContainerRequestsOnStartAndResync() + throws IOException, InterruptedException { + NodeManager nm = new TestNodeManager2(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + // Start the container in running state + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); + + nm.getNMDispatcher().getEventHandler() + .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertFalse(assertionFailedInThread.get()); + nm.stop(); + } + + private YarnConfiguration createNMConfig() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB + conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346"); + conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + remoteLogsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); + return conf; + } + + class TestNodeManager1 extends NodeManager { + + private int registrationCount = 0; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl1(context, dispatcher, + healthChecker, metrics); + } + + public int getNMRegistrationCount() { + return registrationCount; + } + + class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void registerWithRM() throws YarnRemoteException { + super.registerWithRM(); + registrationCount++; + } + + @Override + protected void rebootNodeStatusUpdater() { + ConcurrentMap containers = + getNMContext().getContainers(); + try { + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } catch (AssertionError ae) { + assertionFailedInThread.set(true); + } + } + } + } + + class TestNodeManager2 extends NodeManager { + + Thread launchContainersThread = null; + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl2(context, dispatcher, + healthChecker, metrics); + } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, + LocalDirsHandlerService dirsHandler) { + return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, + metrics, aclsManager, dirsHandler){ + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + if (blockNewContainerRequests) { + // start test thread right after blockNewContainerRequests is set + // true + super.setBlockNewContainerRequests(blockNewContainerRequests); + launchContainersThread = new RejectedContainersLauncherThread(); + launchContainersThread.start(); + } else { + // join the test thread right before blockNewContainerRequests is + // reset + try { + // stop the test thread + ((RejectedContainersLauncherThread) launchContainersThread) + .setStopThreadFlag(true); + launchContainersThread.join(); + ((RejectedContainersLauncherThread) launchContainersThread) + .setStopThreadFlag(false); + super.setBlockNewContainerRequests(blockNewContainerRequests); + } catch (InterruptedException e) { + } + } + } + }; + } + + class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void rebootNodeStatusUpdater() { + ConcurrentMap containers = + getNMContext().getContainers(); + + try { + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + // After this point new containers are free to be launched, except + // containers from previous RM + // Wait here so as to sync with the main test thread. + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } catch (AssertionError ae) { + assertionFailedInThread.set(true); + } + } + } + + class RejectedContainersLauncherThread extends Thread { + + boolean isStopped = false; + public void setStopThreadFlag(boolean isStopped) { + this.isStopped = isStopped; + } + + @Override + public void run() { + int numContainers = 0; + int numContainersRejected = 0; + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + try { + while (!isStopped && numContainers < 10) { + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + Container container = + BuilderUtils.newContainer(cId, null, null, null, null, null, 0); + StartContainerRequest startRequest = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + startRequest.setContainer(container); + System.out.println("no. of containers to be launched: " + + numContainers); + numContainers++; + try { + getContainerManager().startContainer(startRequest); + } catch (YarnRemoteException e) { + numContainersRejected++; + Assert.assertTrue(e.getMessage().contains( + "Rejecting new containers as NodeManager has not" + + " yet connected with ResourceManager")); + // TO DO: This should be replaced to explicitly check exception + // class name after YARN-142 + Assert.assertTrue(e.getRemoteTrace().contains( + NMNotYetReadyException.class.getName())); + } + } + // no. of containers to be launched should equal to no. of + // containers rejected + Assert.assertEquals(numContainers, numContainersRejected); + } catch (AssertionError ae) { + assertionFailedInThread.set(true); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index 1792988bef9..ab634c4c251 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -24,17 +24,12 @@ import static org.mockito.Mockito.when; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; -import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CyclicBarrier; import junit.framework.Assert; @@ -59,12 +54,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; @@ -86,7 +78,6 @@ public class TestNodeManagerShutdown { static final String user = "nobody"; private FileContext localFS; private ContainerId cId; - private CyclicBarrier syncBarrier = new CyclicBarrier(2); @Before public void setup() throws UnsupportedFileSystemException { @@ -110,7 +101,7 @@ public class TestNodeManagerShutdown { NodeManager nm = getNodeManager(); nm.init(createNMConfig()); nm.start(); - startContainers(nm); + startContainer(nm, cId, localFS, tmpDir, processStartFile); final int MAX_TRIES=20; int numTries = 0; @@ -150,29 +141,13 @@ public class TestNodeManagerShutdown { reader.close(); } } - - @SuppressWarnings("unchecked") - @Test - public void testKillContainersOnResync() throws IOException, InterruptedException { - NodeManager nm = new TestNodeManager(); - YarnConfiguration conf = createNMConfig(); - nm.init(conf); - nm.start(); - startContainers(nm); - assert ((TestNodeManager) nm).getNMRegistrationCount() == 1; - nm.getNMDispatcher().getEventHandler(). - handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); - try { - syncBarrier.await(); - } catch (BrokenBarrierException e) { - } - assert ((TestNodeManager) nm).getNMRegistrationCount() == 2; - } - - private void startContainers(NodeManager nm) throws IOException { + public static void startContainer(NodeManager nm, ContainerId cId, + FileContext localFS, File scriptFileDir, File processStartFile) + throws IOException { ContainerManagerImpl containerManager = nm.getContainerManager(); - File scriptFile = createUnhaltingScriptFile(); + File scriptFile = + createUnhaltingScriptFile(cId, scriptFileDir, processStartFile); ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); @@ -218,7 +193,7 @@ public class TestNodeManagerShutdown { Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); } - private ContainerId createContainerId() { + public static ContainerId createContainerId() { ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); appId.setClusterTimestamp(0); appId.setId(0); @@ -247,8 +222,9 @@ public class TestNodeManagerShutdown { * Creates a script to run a container that will run forever unless * stopped by external means. */ - private File createUnhaltingScriptFile() throws IOException { - File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + private static File createUnhaltingScriptFile(ContainerId cId, + File scriptFileDir, File processStartFile) throws IOException { + File scriptFile = Shell.appendScriptExtension(scriptFileDir, "scriptFile"); PrintWriter fileWriter = new PrintWriter(scriptFile); if (Shell.WINDOWS) { fileWriter.println("@echo \"Running testscript for delayed kill\""); @@ -282,48 +258,4 @@ public class TestNodeManagerShutdown { } }; } - - class TestNodeManager extends NodeManager { - - private int registrationCount = 0; - - @Override - protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - return new TestNodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics); - } - - public int getNMRegistrationCount() { - return registrationCount; - } - - class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater { - - public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); - } - - @Override - protected void registerWithRM() throws YarnRemoteException { - super.registerWithRM(); - registrationCount++; - } - - @Override - protected void rebootNodeStatusUpdater() { - ConcurrentMap containers = - getNMContext().getContainers(); - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdater(); - try { - syncBarrier.await(); - } catch (InterruptedException e) { - } catch (BrokenBarrierException e) { - } - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 29d6a4c3a84..10dd155da1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 55e92a440ff..5fd11d58e75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -156,7 +156,13 @@ public abstract class BaseContainerManagerTest { dirsHandler = nodeHealthChecker.getDiskHandler(); containerManager = new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics, new ApplicationACLsManager(conf), dirsHandler); + metrics, new ApplicationACLsManager(conf), dirsHandler) { + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + // do nothing + } + }; containerManager.init(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 981ab39acbd..df3d9173c93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -49,13 +52,18 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -63,7 +71,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; -import static org.mockito.Mockito.*; public class TestContainerManager extends BaseContainerManagerTest { @@ -411,7 +418,13 @@ public class TestContainerManager extends BaseContainerManagerTest { containerManager = new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics, new ApplicationACLsManager(conf), dirsHandler); + metrics, new ApplicationACLsManager(conf), dirsHandler) { + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + // do nothing + } + }; containerManager.init(conf); containerManager.start(); @@ -524,4 +537,77 @@ public class TestContainerManager extends BaseContainerManagerTest { Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!", targetFile.exists()); } + + @Test + public void testContainerLaunchFromPreviousRM() throws IOException, + InterruptedException { + // There is no real RM registration, simulate and set RMIdentifier + NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class); + when(nodeStatusUpdater.getRMIdentifier()).thenReturn((long) 1234); + containerManager = + new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, + metrics, new ApplicationACLsManager(conf), dirsHandler) { + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + // do nothing + } + }; + containerManager.init(conf); + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + ContainerId cId1 = createContainerId(); + ContainerId cId2 = createContainerId(); + containerLaunchContext.setUser(user); + containerLaunchContext + .setLocalResources(new HashMap()); + containerLaunchContext.setUser(containerLaunchContext.getUser()); + Resource mockResource = mock(Resource.class); + + Container mockContainer1 = mock(Container.class); + when(mockContainer1.getId()).thenReturn(cId1); + // Construct the Container with Invalid RMIdentifier + when(mockContainer1.getRMIdentifer()).thenReturn( + (long) ResourceManagerConstants.RM_INVALID_IDENTIFIER); + StartContainerRequest startRequest1 = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest1.setContainerLaunchContext(containerLaunchContext); + startRequest1.setContainer(mockContainer1); + boolean catchException = false; + try { + containerManager.startContainer(startRequest1); + } catch (YarnRemoteException e) { + catchException = true; + Assert.assertTrue(e.getMessage().contains( + "Container " + cId1 + " rejected as it is allocated by a previous RM")); + // TO DO: This should be replaced to explicitly check exception + // class name after YARN-142 + Assert.assertTrue(e.getRemoteTrace().contains( + InvalidContainerException.class.getName())); + } + + // Verify that startContainer fail because of invalid container request + Assert.assertTrue(catchException); + + // Construct the Container with a RMIdentifier within current RM + Container mockContainer2 = mock(Container.class); + when(mockContainer2.getId()).thenReturn(cId2); + when(mockContainer2.getRMIdentifer()).thenReturn((long) 1234); + when(mockContainer2.getResource()).thenReturn(mockResource); + StartContainerRequest startRequest2 = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest2.setContainerLaunchContext(containerLaunchContext); + startRequest2.setContainer(mockContainer2); + boolean noException = true; + try { + containerManager.startContainer(startRequest2); + } catch (YarnRemoteException e) { + noException = false; + } + // Verify that startContainer get no YarnRemoteException + Assert.assertTrue(noException); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 258c7dc0e47..1ee355268a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -196,6 +196,7 @@ public class ResourceTrackerService extends AbstractService implements + capability + ", assigned nodeId " + nodeId); response.setNodeAction(NodeAction.NORMAL); + response.setRMIdentifier(ResourceManager.clusterTimeStamp); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index aca84adf0dd..c4990daec59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -33,11 +33,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 4b398d4f025..5fb1167d714 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -25,8 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 9bbdc3af045..db044958f0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; @Unstable public class NullRMStateStore extends RMStateStore { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 87a2608a38d..d5c5015cf34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -30,12 +30,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java similarity index 89% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index d1dbda0dc51..64e3ccbfcbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -16,10 +16,12 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.api.records; +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; /* * Contains the state data that needs to be persisted for an ApplicationAttempt diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java similarity index 88% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java index fa0a596eb53..d033f5c8837 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java @@ -16,14 +16,15 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.api.records.impl.pb; +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; public class ApplicationAttemptStateDataPBImpl extends ProtoBase diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java similarity index 89% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 9b1e14a3c00..feffca9b2c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -16,10 +16,12 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.api.records; +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; /** * Contains all the state data that needs to be stored persistently diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java similarity index 90% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java index dced42397aa..0aa64b73ec1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java @@ -16,13 +16,13 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.api.records.impl.pb; +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; -import org.apache.hadoop.yarn.api.records.ApplicationStateData; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; public class ApplicationStateDataPBImpl extends ProtoBase diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 58dcb73767d..64f711434b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -1243,7 +1244,7 @@ public class LeafQueue implements CSQueue { // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - null); + null, ResourceManager.clusterTimeStamp); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index a8e71735efc..4bd6e2b54b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -173,7 +174,7 @@ public class AppSchedulable extends Schedulable { // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - containerToken); + containerToken, ResourceManager.clusterTimeStamp); return container; } @@ -371,4 +372,4 @@ public class AppSchedulable extends Schedulable { Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, request.getCapability(), node.getRMNode().getTotalCapability()); } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index d5a542700f4..2024e746dea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; @@ -565,7 +566,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - containerToken); + containerToken, ResourceManager.clusterTimeStamp); // Allocate! diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index dba5acdd82d..46683dcb619 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -188,6 +188,7 @@ public class NodeManager implements ContainerManager { this.nodeId, nodeHttpAddress, requestContainer.getResource(), null, null // DKDC - Doesn't matter + , 0 ); ContainerStatus containerStatus = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index af9d5d2c0bf..9641adbce02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; @@ -267,6 +268,21 @@ public class TestResourceTrackerService { Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction()); } + @Test + public void testSetRMIdentifierInRegistration() throws Exception { + + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + + MockNM nm = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); + RegisterNodeManagerResponse response = nm.registerNode(); + + // Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse + Assert.assertEquals(ResourceManager.clusterTimeStamp, + response.getRMIdentifier()); + } + @Test public void testReboot() throws Exception { Configuration conf = new Configuration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java new file mode 100644 index 00000000000..6bd3a54e1b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.applicationmasterservice; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestApplicationMasterService { + private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); + + private final int GB = 1024; + private static YarnConfiguration conf; + + @BeforeClass + public static void setup() { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + } + + @Test(timeout = 30000) + public void testRMIdentifierOnContainerAllocation() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("h1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + am1.addRequests(new String[] { "h1" }, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(1000); + alloc1Response = am1.schedule(); + } + + // assert RMIdentifer is set properly in allocated containers + Assert.assertEquals(rm.clusterTimeStamp, alloc1Response + .getAllocatedContainers().get(0).getRMIdentifer()); + rm.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 6e2b834f69a..abceecf6211 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -69,7 +69,7 @@ public class TestRMContainerImpl { Priority priority = BuilderUtils.newPriority(5); Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); + "host:3465", resource, priority, null, 0); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, eventHandler, expirer); @@ -139,7 +139,7 @@ public class TestRMContainerImpl { Priority priority = BuilderUtils.newPriority(5); Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); + "host:3465", resource, priority, null, 0); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, eventHandler, expirer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 0432444168a..46fffb47d55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -360,7 +360,7 @@ public class TestContainerManagerSecurity { Container container = BuilderUtils.newContainer(newTokenId.getContainerID(), null, null, BuilderUtils.newResource(newTokenId.getResource().getMemory(), - newTokenId.getResource().getVirtualCores()), null, null); + newTokenId.getResource().getVirtualCores()), null, null, 0); StartContainerRequest request = Records.newRecord(StartContainerRequest.class); request.setContainerLaunchContext(context); request.setContainer(container); @@ -547,7 +547,7 @@ public class TestContainerManagerSecurity { createContainerLaunchContextForTest(tokenId); Container container = BuilderUtils.newContainer(tokenId.getContainerID(), null, null, - BuilderUtils.newResource(2048, 1), null, null); + BuilderUtils.newResource(2048, 1), null, null, 0); request.setContainerLaunchContext(context); request.setContainer(container); try { @@ -575,7 +575,7 @@ public class TestContainerManagerSecurity { Container container = BuilderUtils.newContainer(tokenId.getContainerID(), null, null, BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId - .getResource().getVirtualCores()), null, null); + .getResource().getVirtualCores()), null, null, 0); request.setContainerLaunchContext(context); request.setContainer(container); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServer.java new file mode 100644 index 00000000000..f16575cbc49 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServer.java @@ -0,0 +1,52 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.webproxy; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; +import org.apache.hadoop.yarn.service.Service.STATE; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestWebAppProxyServer { + private WebAppProxyServer webAppProxy = null; + + @Before + public void setUp() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:8888"); + webAppProxy = new WebAppProxyServer(); + webAppProxy.init(conf); + } + + @After + public void tearDown() throws Exception { + webAppProxy.stop(); + } + + @Test + public void testStart() { + assertEquals(STATE.INITED, webAppProxy.getServiceState()); + webAppProxy.start(); + assertEquals(STATE.STARTED, webAppProxy.getServiceState()); + } +}