diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index 55b4c806160..9f1ffb11532 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -377,6 +377,28 @@ public class NetworkTopology { } } + /** + * Given a string representation of a rack, return its children + * @param loc a path-like string representation of a rack + * @return a newly allocated list with all the node's children + */ + public List getDatanodesInRack(String loc) { + netlock.readLock().lock(); + try { + loc = NodeBase.normalize(loc); + if (!NodeBase.ROOT.equals(loc)) { + loc = loc.substring(1); + } + InnerNode rack = (InnerNode) clusterMap.getLoc(loc); + if (rack == null) { + return null; + } + return new ArrayList(rack.getChildren()); + } finally { + netlock.readLock().unlock(); + } + } + /** Remove a node * Update node counter and rack counter if necessary * @param node node to be removed; can be null diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 69e732ddbfc..f31a76c19e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -27,6 +27,9 @@ Release 2.0.5-beta - UNRELEASED HDFS-4502. JsonUtil.toFileStatus(..) should check if the fileId property exists. (Brandon Li via suresh) + HDFS-2576. Enhances the DistributedFileSystem's create API so that clients + can specify favored datanodes for a file's blocks. (ddas) + IMPROVEMENTS HDFS-4222. NN is unresponsive and loses heartbeats from DNs when diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 8b9cac69ddd..dc239ed891c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1232,7 +1232,7 @@ public class DFSClient implements java.io.Closeable { ChecksumOpt checksumOpt) throws IOException { return create(src, permission, flag, true, - replication, blockSize, progress, buffersize, checksumOpt); + replication, blockSize, progress, buffersize, checksumOpt, null); } /** @@ -1266,6 +1266,29 @@ public class DFSClient implements java.io.Closeable { Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException { + return create(src, permission, flag, createParent, replication, blockSize, + progress, buffersize, checksumOpt, null); + } + + /** + * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, + * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is + * a hint to where the namenode should place the file blocks. + * The favored nodes hint is not persisted in HDFS. Hence it may be honored + * at the creation time only. HDFS could move the blocks during balancing or + * replication, to move the blocks from favored nodes. A value of null means + * no favored nodes for this create + */ + public DFSOutputStream create(String src, + FsPermission permission, + EnumSet flag, + boolean createParent, + short replication, + long blockSize, + Progressable progress, + int buffersize, + ChecksumOpt checksumOpt, + InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); @@ -1274,9 +1297,18 @@ public class DFSClient implements java.io.Closeable { if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } + String[] favoredNodeStrs = null; + if (favoredNodes != null) { + favoredNodeStrs = new String[favoredNodes.length]; + for (int i = 0; i < favoredNodes.length; i++) { + favoredNodeStrs[i] = + favoredNodes[i].getAddress().getHostAddress() + ":" + + favoredNodes[i].getPort(); + } + } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, - buffersize, dfsClientConf.createChecksum(checksumOpt)); + buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs); beginFileLease(src, result); return result; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index a9daf35b701..63544913794 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -323,6 +323,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { return key; } }); + private String[] favoredNodes; volatile boolean hasError = false; volatile int errorIndex = -1; private BlockConstructionStage stage; // block construction stage @@ -399,7 +400,11 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { } } - + + private void setFavoredNodes(String[] favoredNodes) { + this.favoredNodes = favoredNodes; + } + /** * Initialize for data streaming */ @@ -1181,7 +1186,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, fileId); + block, excludedNodes, fileId, favoredNodes); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, @@ -1321,7 +1326,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { /** Construct a new output stream for creating a file. */ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, - DataChecksum checksum) throws IOException { + DataChecksum checksum, String[] favoredNodes) throws IOException { this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); @@ -1329,12 +1334,15 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { checksum.getBytesPerChecksum()); streamer = new DataStreamer(); + if (favoredNodes != null && favoredNodes.length != 0) { + streamer.setFavoredNodes(favoredNodes); + } } static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, - DataChecksum checksum) throws IOException { + DataChecksum checksum, String[] favoredNodes) throws IOException { final HdfsFileStatus stat; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, @@ -1351,11 +1359,19 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { UnresolvedPathException.class); } final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, - flag, progress, checksum); + flag, progress, checksum, favoredNodes); out.start(); return out; } + static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, + FsPermission masked, EnumSet flag, boolean createParent, + short replication, long blockSize, Progressable progress, int buffersize, + DataChecksum checksum) throws IOException { + return newStreamForCreate(dfsClient, src, masked, flag, createParent, replication, + blockSize, progress, buffersize, checksum, null); + } + /** Construct a new output stream for append. */ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 800031394d4..135accabfbf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -265,6 +265,27 @@ public class DistributedFileSystem extends FileSystem { : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, blockSize, progress, null); } + + /** + * Same as + * {@link #create(Path, FsPermission, boolean, int, short, long, + * Progressable)} with the addition of favoredNodes that is a hint to + * where the namenode should place the file blocks. + * The favored nodes hint is not persisted in HDFS. Hence it may be honored + * at the creation time only. HDFS could move the blocks during balancing or + * replication, to move the blocks from favored nodes. A value of null means + * no favored nodes for this create + */ + public HdfsDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress, InetSocketAddress[] favoredNodes) throws IOException { + statistics.incrementWriteOps(1); + final DFSOutputStream out = dfs.create(getPathName(f), permission, + overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) + : EnumSet.of(CreateFlag.CREATE), + true, replication, blockSize, progress, bufferSize, null, favoredNodes); + return new HdfsDataOutputStream(out, statistics); + } @Override public HdfsDataOutputStream create(Path f, FsPermission permission, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index cbfc2f2759a..8169a80ac49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -300,6 +300,8 @@ public interface ClientProtocol { * @param excludeNodes a list of nodes that should not be * allocated for the current block * @param fileId the id uniquely identifying a file + * @param favoredNodes the list of nodes where the client wants the blocks. + * Nodes are identified by either host name or address. * * @return LocatedBlock allocated block information. * @@ -314,7 +316,8 @@ public interface ClientProtocol { */ @Idempotent public LocatedBlock addBlock(String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId) + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index d3e931587cd..b3c6c268758 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -355,12 +355,15 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements try { List excl = req.getExcludeNodesList(); + List favor = req.getFavoredNodesList(); LocatedBlock result = server.addBlock( req.getSrc(), req.getClientName(), req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null, (excl == null || excl.size() == 0) ? null : PBHelper.convert(excl - .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId()); + .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(), + (favor == null || favor.size() == 0) ? null : favor + .toArray(new String[favor.size()])); return AddBlockResponseProto.newBuilder() .setBlock(PBHelper.convert(result)).build(); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index cd9c8111b49..1ae252c23f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -302,7 +302,8 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public LocatedBlock addBlock(String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId) + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes) throws AccessControlException, FileNotFoundException, NotReplicatedYetException, SafeModeException, UnresolvedLinkException, IOException { @@ -312,6 +313,9 @@ public class ClientNamenodeProtocolTranslatorPB implements req.setPrevious(PBHelper.convert(previous)); if (excludeNodes != null) req.addAllExcludeNodes(PBHelper.convert(excludeNodes)); + if (favoredNodes != null) { + req.addAllFavoredNodes(Arrays.asList(favoredNodes)); + } try { return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index b71304cb339..ba0c4b7709d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; @@ -1333,11 +1335,12 @@ public class BlockManager { public DatanodeDescriptor[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, final HashMap excludedNodes, - final long blocksize) throws IOException { - // choose targets for the new block to be allocated. + final long blocksize, List favoredNodes) throws IOException { + List favoredDatanodeDescriptors = + getDatanodeDescriptors(favoredNodes); final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src, - numOfReplicas, client, new ArrayList(), false, - excludedNodes, blocksize); + numOfReplicas, client, excludedNodes, blocksize, + favoredDatanodeDescriptors); if (targets.length < minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" @@ -1350,6 +1353,24 @@ public class BlockManager { return targets; } + /** + * Get list of datanode descriptors for given list of nodes. Nodes are + * hostaddress:port or just hostaddress. + */ + List getDatanodeDescriptors(List nodes) { + List datanodeDescriptors = null; + if (nodes != null) { + datanodeDescriptors = new ArrayList(nodes.size()); + for (int i = 0; i < nodes.size(); i++) { + DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i)); + if (node != null) { + datanodeDescriptors.add(node); + } + } + } + return datanodeDescriptors; + } + /** * Parse the data-nodes the block belongs to and choose one, * which will be the replication source. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 472a83950ff..d1da972e345 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -116,6 +116,25 @@ public abstract class BlockPlacementPolicy { return chooseTarget(srcBC.getName(), numOfReplicas, writer, chosenNodes, false, excludedNodes, blocksize); } + + /** + * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, + * HashMap, long)} with added parameter {@code favoredDatanodes} + * @param favoredNodes datanodes that should be favored as targets. This + * is only a hint and due to cluster state, namenode may not be + * able to place the blocks on these datanodes. + */ + DatanodeDescriptor[] chooseTarget(String src, + int numOfReplicas, DatanodeDescriptor writer, + HashMap excludedNodes, + long blocksize, List favoredNodes) { + // This class does not provide the functionality of placing + // a block in favored datanodes. The implementations of this class + // are expected to provide this functionality + return chooseTarget(src, numOfReplicas, writer, + new ArrayList(numOfReplicas), false, excludedNodes, + blocksize); + } /** * Verify that the block is replicated on at least minRacks different racks diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 9c37f03b070..c90cbef5012 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -125,6 +125,60 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { excludedNodes, blocksize); } + @Override + DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas, + DatanodeDescriptor writer, HashMap excludedNodes, + long blocksize, List favoredNodes) { + try { + if (favoredNodes == null || favoredNodes.size() == 0) { + // Favored nodes not specified, fall back to regular block placement. + return chooseTarget(src, numOfReplicas, writer, + new ArrayList(numOfReplicas), false, + excludedNodes, blocksize); + } + + HashMap favoriteAndExcludedNodes = excludedNodes == null ? + new HashMap() : new HashMap(excludedNodes); + + // Choose favored nodes + List results = new ArrayList(); + boolean avoidStaleNodes = stats != null + && stats.isAvoidingStaleDataNodesForWrite(); + for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) { + DatanodeDescriptor favoredNode = favoredNodes.get(i); + // Choose a single node which is local to favoredNode. + // 'results' is updated within chooseLocalNode + DatanodeDescriptor target = chooseLocalNode(favoredNode, + favoriteAndExcludedNodes, blocksize, + getMaxNodesPerRack(results, + numOfReplicas)[1], results, avoidStaleNodes); + if (target == null) { + LOG.warn("Could not find a target for file " + src + + " with favored node " + favoredNode); + continue; + } + favoriteAndExcludedNodes.put(target, target); + } + + if (results.size() < numOfReplicas) { + // Not enough favored nodes, choose other nodes. + numOfReplicas -= results.size(); + DatanodeDescriptor[] remainingTargets = + chooseTarget(src, numOfReplicas, writer, results, + false, favoriteAndExcludedNodes, blocksize); + for (int i = 0; i < remainingTargets.length; i++) { + results.add(remainingTargets[i]); + } + } + return results.toArray(new DatanodeDescriptor[results.size()]); + } catch (NotEnoughReplicasException nr) { + // Fall back to regular block placement disregarding favored nodes hint + return chooseTarget(src, numOfReplicas, writer, + new ArrayList(numOfReplicas), false, + excludedNodes, blocksize); + } + } + /** This is the implementation. */ DatanodeDescriptor[] chooseTarget(int numOfReplicas, DatanodeDescriptor writer, @@ -140,15 +194,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { excludedNodes = new HashMap(); } - int clusterSize = clusterMap.getNumOfLeaves(); - int totalNumOfReplicas = chosenNodes.size()+numOfReplicas; - if (totalNumOfReplicas > clusterSize) { - numOfReplicas -= (totalNumOfReplicas-clusterSize); - totalNumOfReplicas = clusterSize; - } - - int maxNodesPerRack = - (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; + int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas); + numOfReplicas = result[0]; + int maxNodesPerRack = result[1]; List results = new ArrayList(chosenNodes); @@ -172,6 +220,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return getPipeline((writer==null)?localNode:writer, results.toArray(new DatanodeDescriptor[results.size()])); } + + private int[] getMaxNodesPerRack(List chosenNodes, + int numOfReplicas) { + int clusterSize = clusterMap.getNumOfLeaves(); + int totalNumOfReplicas = chosenNodes.size()+numOfReplicas; + if (totalNumOfReplicas > clusterSize) { + numOfReplicas -= (totalNumOfReplicas-clusterSize); + totalNumOfReplicas = clusterSize; + } + int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; + return new int[] {numOfReplicas, maxNodesPerRack}; + } /* choose numOfReplicas from all data nodes */ private DatanodeDescriptor chooseTarget(int numOfReplicas, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 2437b111f48..d6a72329e0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -313,6 +313,68 @@ public class DatanodeManager { return host2DatanodeMap.getDatanodeByHost(host); } + /** @return the datanode descriptor for the host. */ + public DatanodeDescriptor getDatanodeByXferAddr(String host, int xferPort) { + return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); + } + + /** + * Given datanode address or host name, returns the DatanodeDescriptor for the + * same, or if it doesn't find the datanode, it looks for a machine local and + * then rack local datanode, if a rack local datanode is not possible either, + * it returns the DatanodeDescriptor of any random node in the cluster. + * + * @param address hostaddress:transfer address + * @return the best match for the given datanode + * @throws IOException when no datanode is found for given address + */ + DatanodeDescriptor getDatanodeDescriptor(String address) { + DatanodeDescriptor node = null; + int colon = address.indexOf(":"); + int xferPort; + String host = address; + if (colon > 0) { + host = address.substring(0, colon); + xferPort = Integer.parseInt(address.substring(colon+1)); + node = getDatanodeByXferAddr(host, xferPort); + } + if (node == null) { + node = getDatanodeByHost(host); + } + if (node == null) { + String networkLocation = resolveNetworkLocation(host); + + // If the current cluster doesn't contain the node, fallback to + // something machine local and then rack local. + List rackNodes = getNetworkTopology() + .getDatanodesInRack(networkLocation); + if (rackNodes != null) { + // Try something machine local. + for (Node rackNode : rackNodes) { + if (((DatanodeDescriptor) rackNode).getIpAddr().equals(host)) { + node = (DatanodeDescriptor) rackNode; + break; + } + } + + // Try something rack local. + if (node == null && !rackNodes.isEmpty()) { + node = (DatanodeDescriptor) (rackNodes + .get(DFSUtil.getRandom().nextInt(rackNodes.size()))); + } + } + + // If we can't even choose rack local, just choose any node in the + // cluster. + if (node == null) { + node = (DatanodeDescriptor)getNetworkTopology() + .chooseRandom(NodeBase.ROOT); + } + } + return node; + } + + /** Get a datanode descriptor given corresponding storageID */ DatanodeDescriptor getDatanode(final String storageID) { return datanodeMap.get(storageID); @@ -442,8 +504,13 @@ public class DatanodeManager { } } + public String resolveNetworkLocation(String host) { + DatanodeID d = parseDNFromHostsEntry(host); + return resolveNetworkLocation(d); + } + /* Resolve a node's network location */ - private void resolveNetworkLocation (DatanodeDescriptor node) { + private String resolveNetworkLocation (DatanodeID node) { List names = new ArrayList(1); if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { names.add(node.getIpAddr()); @@ -461,7 +528,7 @@ public class DatanodeManager { } else { networkLocation = rName.get(0); } - node.setNetworkLocation(networkLocation); + return networkLocation; } private boolean inHostsList(DatanodeID node) { @@ -694,7 +761,7 @@ public class DatanodeManager { nodeS.setDisallowed(false); // Node is in the include list // resolve network location - resolveNetworkLocation(nodeS); + nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); getNetworkTopology().add(nodeS); // also treat the registration message as a heartbeat @@ -726,7 +793,7 @@ public class DatanodeManager { = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); boolean success = false; try { - resolveNetworkLocation(nodeDescr); + nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); networktopology.add(nodeDescr); // register new datanode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ed1d58b08e1..2123052b98d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2208,7 +2208,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * client to "try again later". */ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, - ExtendedBlock previous, HashMap excludedNodes) + ExtendedBlock previous, HashMap excludedNodes, + List favoredNodes) throws LeaseExpiredException, NotReplicatedYetException, QuotaExceededException, SafeModeException, UnresolvedLinkException, IOException { @@ -2253,8 +2254,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } // choose targets for the new block to be allocated. - final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( - src, replication, clientNode, excludedNodes, blockSize); + final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( + src, replication, clientNode, excludedNodes, blockSize, favoredNodes); // Part II. // Allocate a new block, add it to the INode and the BlocksMap. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index b88811b6a06..93b23b6b11a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -29,6 +29,7 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; @@ -473,7 +474,8 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override public LocatedBlock addBlock(String src, String clientName, - ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId) + ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, + String[] favoredNodes) throws IOException { if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src @@ -486,8 +488,10 @@ class NameNodeRpcServer implements NamenodeProtocols { excludedNodesSet.put(node, node); } } + List favoredNodesList = (favoredNodes == null) ? null + : Arrays.asList(favoredNodes); LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, - clientName, previous, excludedNodesSet); + clientName, previous, excludedNodesSet, favoredNodesList); if (locatedBlock != null) metrics.incrAddBlockOps(); return locatedBlock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index d99e4e7209b..3f0ea7f6692 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -121,6 +121,7 @@ message AddBlockRequestProto { optional ExtendedBlockProto previous = 3; repeated DatanodeInfoProto excludeNodes = 4; optional uint64 fileId = 5 [default = 0]; // default as a bogus id + repeated string favoredNodes = 6; //the set of datanodes to use for the block } message AddBlockResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 458ed671ff4..ad8443876e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -241,7 +241,7 @@ public class TestDFSClientRetries { anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class), - anyLong())).thenAnswer(answer); + anyLong(), any(String[].class))).thenAnswer(answer); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( @@ -390,7 +390,7 @@ public class TestDFSClientRetries { } }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(), Mockito. any(), Mockito. any(), - Mockito.anyLong()); + Mockito.anyLong(), Mockito. any()); doAnswer(new Answer() { @@ -432,7 +432,7 @@ public class TestDFSClientRetries { Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock( Mockito.anyString(), Mockito.anyString(), Mockito. any(), Mockito. any(), - Mockito.anyLong()); + Mockito.anyLong(), Mockito. any()); Mockito.verify(spyNN, Mockito.atLeastOnce()).complete( Mockito.anyString(), Mockito.anyString(), Mockito.any()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 5a01ea57a8f..66cb90fed81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -519,7 +519,7 @@ public class TestFileCreation { // add one block to the file LocatedBlock location = client.getNamenode().addBlock(file1.toString(), - client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID); + client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID, null); System.out.println("testFileCreationError2: " + "Added block " + location.getBlock()); @@ -570,7 +570,7 @@ public class TestFileCreation { createFile(dfs, f, 3); try { cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName, - null, null, INodeId.GRANDFATHER_INODE_ID); + null, null, INodeId.GRANDFATHER_INODE_ID, null); fail(); } catch(IOException ioe) { FileSystem.LOG.info("GOOD!", ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 891c6699454..8e86e1e74bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -1059,7 +1059,7 @@ public class NNThroughputBenchmark { ExtendedBlock prevBlock = null; for(int jdx = 0; jdx < blocksPerFile; jdx++) { LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, - prevBlock, null, INodeId.GRANDFATHER_INODE_ID); + prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null); prevBlock = loc.getBlock(); for(DatanodeInfo dnInfo : loc.getLocations()) { int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index 793cec6e93a..8a3b52330b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy; import java.lang.reflect.Field; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -108,7 +109,7 @@ public class TestAddBlockRetry { if(count == 1) { // run second addBlock() LOG.info("Starting second addBlock for " + src); nn.addBlock(src, "clientName", null, null, - INodeId.GRANDFATHER_INODE_ID); + INodeId.GRANDFATHER_INODE_ID, null); LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size()); lb2 = lbs.get(0); @@ -119,7 +120,7 @@ public class TestAddBlockRetry { } }).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(), Mockito.any(), Mockito.>any(), - Mockito.anyLong()); + Mockito.anyLong(), Mockito.>any()); // create file nn.create(src, FsPermission.getFileDefault(), @@ -129,7 +130,7 @@ public class TestAddBlockRetry { // start first addBlock() LOG.info("Starting first addBlock for " + src); - nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID); + nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null); // check locations LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java new file mode 100644 index 00000000000..015c021b066 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java @@ -0,0 +1,220 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Random; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.Test; +import org.junit.AfterClass; +import org.junit.BeforeClass; + + +public class TestFavoredNodesEndToEnd { + private static MiniDFSCluster cluster; + private static Configuration conf; + private final static int NUM_DATA_NODES = 10; + private final static int NUM_FILES = 10; + private final static byte[] SOME_BYTES = new String("foo").getBytes(); + private static DistributedFileSystem dfs; + private static ArrayList datanodes; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) + .build(); + cluster.waitClusterUp(); + dfs = cluster.getFileSystem(); + datanodes = cluster.getDataNodes(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testFavoredNodesEndToEnd() throws Exception { + //create 10 files with random preferred nodes + for (int i = 0; i < NUM_FILES; i++) { + Random rand = new Random(System.currentTimeMillis() + i); + //pass a new created rand so as to get a uniform distribution each time + //without too much collisions (look at the do-while loop in getDatanodes) + InetSocketAddress datanode[] = getDatanodes(rand); + Path p = new Path("/filename"+i); + FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, + 4096, (short)3, (long)4096, null, datanode); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = + dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, + Long.MAX_VALUE); + //make sure we have exactly one block location, and three hosts + assertTrue(locations.length == 1 && locations[0].getHosts().length == 3); + //verify the files got created in the right nodes + for (BlockLocation loc : locations) { + String[] hosts = loc.getNames(); + String[] hosts1 = getStringForInetSocketAddrs(datanode); + assertTrue(compareNodes(hosts, hosts1)); + } + } + } + + @Test + public void testWhenFavoredNodesNotPresent() throws Exception { + //when we ask for favored nodes but the nodes are not there, we should + //get some other nodes. In other words, the write to hdfs should not fail + //and if we do getBlockLocations on the file, we should see one blklocation + //and three hosts for that + Random rand = new Random(System.currentTimeMillis()); + InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3]; + for (int i = 0; i < 3; i++) { + arbitraryAddrs[i] = getArbitraryLocalHostAddr(); + } + Path p = new Path("/filename-foo-bar"); + FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, + 4096, (short)3, (long)4096, null, arbitraryAddrs); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = + dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, + Long.MAX_VALUE); + assertTrue(locations.length == 1 && locations[0].getHosts().length == 3); + } + + @Test + public void testWhenSomeNodesAreNotGood() throws Exception { + //make some datanode not "good" so that even if the client prefers it, + //the namenode would not give it as a replica to write to + DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager() + .getDatanodeManager().getDatanodeByXferAddr( + datanodes.get(0).getXferAddress().getAddress().getHostAddress(), + datanodes.get(0).getXferAddress().getPort()); + //set the decommission status to true so that + //BlockPlacementPolicyDefault.isGoodTarget returns false for this dn + d.setDecommissioned(); + InetSocketAddress addrs[] = new InetSocketAddress[3]; + for (int i = 0; i < 3; i++) { + addrs[i] = datanodes.get(i).getXferAddress(); + } + Path p = new Path("/filename-foo-bar-baz"); + FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, + 4096, (short)3, (long)4096, null, addrs); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = + dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, + Long.MAX_VALUE); + //reset the state + d.stopDecommission(); + assertTrue(locations.length == 1 && locations[0].getHosts().length == 3); + //also make sure that the datanode[0] is not in the list of hosts + String datanode0 = + datanodes.get(0).getXferAddress().getAddress().getHostAddress() + + ":" + datanodes.get(0).getXferAddress().getPort(); + for (int i = 0; i < 3; i++) { + if (locations[0].getNames()[i].equals(datanode0)) { + fail(datanode0 + " not supposed to be a replica for the block"); + } + } + } + + private String[] getStringForInetSocketAddrs(InetSocketAddress[] datanode) { + String strs[] = new String[datanode.length]; + for (int i = 0; i < datanode.length; i++) { + strs[i] = datanode[i].getAddress().getHostAddress() + ":" + + datanode[i].getPort(); + } + return strs; + } + + private boolean compareNodes(String[] dnList1, String[] dnList2) { + for (int i = 0; i < dnList1.length; i++) { + boolean matched = false; + for (int j = 0; j < dnList2.length; j++) { + if (dnList1[i].equals(dnList2[j])) { + matched = true; + break; + } + } + if (matched == false) { + fail(dnList1[i] + " not a favored node"); + } + } + return true; + } + + private InetSocketAddress[] getDatanodes(Random rand) { + //Get some unique random indexes + int idx1 = rand.nextInt(NUM_DATA_NODES); + int idx2; + + do { + idx2 = rand.nextInt(NUM_DATA_NODES); + } while (idx1 == idx2); + + int idx3; + do { + idx3 = rand.nextInt(NUM_DATA_NODES); + } while (idx2 == idx3 || idx1 == idx3); + + InetSocketAddress[] addrs = new InetSocketAddress[3]; + addrs[0] = datanodes.get(idx1).getXferAddress(); + addrs[1] = datanodes.get(idx2).getXferAddress(); + addrs[2] = datanodes.get(idx3).getXferAddress(); + return addrs; + } + + private InetSocketAddress getArbitraryLocalHostAddr() + throws UnknownHostException{ + Random rand = new Random(System.currentTimeMillis()); + int port = rand.nextInt(65535); + while (true) { + boolean conflict = false; + for (DataNode d : datanodes) { + if (d.getXferAddress().getPort() == port) { + port = rand.nextInt(65535); + conflict = true; + } + } + if (conflict == false) { + break; + } + } + return new InetSocketAddress(InetAddress.getLocalHost(), port); + } +}