HDFS-2576. Enhances the DistributedFileSystem's create API so that clients can specify favored datanodes for a file's blocks. Contributed by Devaraj Das and Pritam Damania.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1476395 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
95fa3e82e4
commit
5d2ffde68e
|
@ -446,6 +446,28 @@ public class NetworkTopology {
|
|||
return getNode(node.getNetworkLocation());
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a string representation of a rack, return its children
|
||||
* @param loc a path-like string representation of a rack
|
||||
* @return a newly allocated list with all the node's children
|
||||
*/
|
||||
public List<Node> getDatanodesInRack(String loc) {
|
||||
netlock.readLock().lock();
|
||||
try {
|
||||
loc = NodeBase.normalize(loc);
|
||||
if (!NodeBase.ROOT.equals(loc)) {
|
||||
loc = loc.substring(1);
|
||||
}
|
||||
InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
|
||||
if (rack == null) {
|
||||
return null;
|
||||
}
|
||||
return new ArrayList<Node>(rack.getChildren());
|
||||
} finally {
|
||||
netlock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/** Remove a node
|
||||
* Update node counter and rack counter if necessary
|
||||
* @param node node to be removed; can be null
|
||||
|
|
|
@ -22,6 +22,9 @@ Trunk (Unreleased)
|
|||
Azure environments. (See breakdown of tasks below for subtasks and
|
||||
contributors)
|
||||
|
||||
HDFS-2576. Enhances the DistributedFileSystem's create API so that clients
|
||||
can specify favored datanodes for a file's blocks. (ddas)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
|
||||
|
|
|
@ -1207,7 +1207,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
ChecksumOpt checksumOpt)
|
||||
throws IOException {
|
||||
return create(src, permission, flag, true,
|
||||
replication, blockSize, progress, buffersize, checksumOpt);
|
||||
replication, blockSize, progress, buffersize, checksumOpt, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1241,6 +1241,29 @@ public class DFSClient implements java.io.Closeable {
|
|||
Progressable progress,
|
||||
int buffersize,
|
||||
ChecksumOpt checksumOpt) throws IOException {
|
||||
return create(src, permission, flag, createParent, replication, blockSize,
|
||||
progress, buffersize, checksumOpt, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
|
||||
* Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
|
||||
* a hint to where the namenode should place the file blocks.
|
||||
* The favored nodes hint is not persisted in HDFS. Hence it may be honored
|
||||
* at the creation time only. HDFS could move the blocks during balancing or
|
||||
* replication, to move the blocks from favored nodes. A value of null means
|
||||
* no favored nodes for this create
|
||||
*/
|
||||
public DFSOutputStream create(String src,
|
||||
FsPermission permission,
|
||||
EnumSet<CreateFlag> flag,
|
||||
boolean createParent,
|
||||
short replication,
|
||||
long blockSize,
|
||||
Progressable progress,
|
||||
int buffersize,
|
||||
ChecksumOpt checksumOpt,
|
||||
InetSocketAddress[] favoredNodes) throws IOException {
|
||||
checkOpen();
|
||||
if (permission == null) {
|
||||
permission = FsPermission.getFileDefault();
|
||||
|
@ -1249,9 +1272,18 @@ public class DFSClient implements java.io.Closeable {
|
|||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug(src + ": masked=" + masked);
|
||||
}
|
||||
String[] favoredNodeStrs = null;
|
||||
if (favoredNodes != null) {
|
||||
favoredNodeStrs = new String[favoredNodes.length];
|
||||
for (int i = 0; i < favoredNodes.length; i++) {
|
||||
favoredNodeStrs[i] =
|
||||
favoredNodes[i].getAddress().getHostAddress() + ":"
|
||||
+ favoredNodes[i].getPort();
|
||||
}
|
||||
}
|
||||
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
||||
src, masked, flag, createParent, replication, blockSize, progress,
|
||||
buffersize, dfsClientConf.createChecksum(checksumOpt));
|
||||
buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
|
||||
beginFileLease(src, result);
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -314,6 +314,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
return key;
|
||||
}
|
||||
});
|
||||
private String[] favoredNodes;
|
||||
volatile boolean hasError = false;
|
||||
volatile int errorIndex = -1;
|
||||
private BlockConstructionStage stage; // block construction stage
|
||||
|
@ -391,6 +392,10 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
}
|
||||
}
|
||||
|
||||
private void setFavoredNodes(String[] favoredNodes) {
|
||||
this.favoredNodes = favoredNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize for data streaming
|
||||
*/
|
||||
|
@ -1176,7 +1181,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
while (true) {
|
||||
try {
|
||||
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
|
||||
block, excludedNodes, fileId);
|
||||
block, excludedNodes, fileId, favoredNodes);
|
||||
} catch (RemoteException e) {
|
||||
IOException ue =
|
||||
e.unwrapRemoteException(FileNotFoundException.class,
|
||||
|
@ -1317,7 +1322,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
/** Construct a new output stream for creating a file. */
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||
EnumSet<CreateFlag> flag, Progressable progress,
|
||||
DataChecksum checksum) throws IOException {
|
||||
DataChecksum checksum, String[] favoredNodes) throws IOException {
|
||||
this(dfsClient, src, progress, stat, checksum);
|
||||
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
|
||||
|
||||
|
@ -1325,12 +1330,15 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
checksum.getBytesPerChecksum());
|
||||
|
||||
streamer = new DataStreamer();
|
||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
||||
streamer.setFavoredNodes(favoredNodes);
|
||||
}
|
||||
}
|
||||
|
||||
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize, Progressable progress, int buffersize,
|
||||
DataChecksum checksum) throws IOException {
|
||||
DataChecksum checksum, String[] favoredNodes) throws IOException {
|
||||
final HdfsFileStatus stat;
|
||||
try {
|
||||
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
|
||||
|
@ -1347,11 +1355,19 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
UnresolvedPathException.class);
|
||||
}
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
|
||||
flag, progress, checksum);
|
||||
flag, progress, checksum, favoredNodes);
|
||||
out.start();
|
||||
return out;
|
||||
}
|
||||
|
||||
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize, Progressable progress, int buffersize,
|
||||
DataChecksum checksum) throws IOException {
|
||||
return newStreamForCreate(dfsClient, src, masked, flag, createParent, replication,
|
||||
blockSize, progress, buffersize, checksum, null);
|
||||
}
|
||||
|
||||
/** Construct a new output stream for append. */
|
||||
private DFSOutputStream(DFSClient dfsClient, String src,
|
||||
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||
|
|
|
@ -266,6 +266,27 @@ public class DistributedFileSystem extends FileSystem {
|
|||
blockSize, progress, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as
|
||||
* {@link #create(Path, FsPermission, boolean, int, short, long,
|
||||
* Progressable)} with the addition of favoredNodes that is a hint to
|
||||
* where the namenode should place the file blocks.
|
||||
* The favored nodes hint is not persisted in HDFS. Hence it may be honored
|
||||
* at the creation time only. HDFS could move the blocks during balancing or
|
||||
* replication, to move the blocks from favored nodes. A value of null means
|
||||
* no favored nodes for this create
|
||||
*/
|
||||
public HdfsDataOutputStream create(Path f, FsPermission permission,
|
||||
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||
Progressable progress, InetSocketAddress[] favoredNodes) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
final DFSOutputStream out = dfs.create(getPathName(f), permission,
|
||||
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
|
||||
: EnumSet.of(CreateFlag.CREATE),
|
||||
true, replication, blockSize, progress, bufferSize, null, favoredNodes);
|
||||
return new HdfsDataOutputStream(out, statistics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HdfsDataOutputStream create(Path f, FsPermission permission,
|
||||
EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
|
||||
|
|
|
@ -300,6 +300,8 @@ public interface ClientProtocol {
|
|||
* @param excludeNodes a list of nodes that should not be
|
||||
* allocated for the current block
|
||||
* @param fileId the id uniquely identifying a file
|
||||
* @param favoredNodes the list of nodes where the client wants the blocks.
|
||||
* Nodes are identified by either host name or address.
|
||||
*
|
||||
* @return LocatedBlock allocated block information.
|
||||
*
|
||||
|
@ -314,7 +316,8 @@ public interface ClientProtocol {
|
|||
*/
|
||||
@Idempotent
|
||||
public LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
|
||||
String[] favoredNodes)
|
||||
throws AccessControlException, FileNotFoundException,
|
||||
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
|
||||
IOException;
|
||||
|
|
|
@ -355,12 +355,15 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
|
||||
try {
|
||||
List<DatanodeInfoProto> excl = req.getExcludeNodesList();
|
||||
List<String> favor = req.getFavoredNodesList();
|
||||
LocatedBlock result = server.addBlock(
|
||||
req.getSrc(),
|
||||
req.getClientName(),
|
||||
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
|
||||
(excl == null || excl.size() == 0) ? null : PBHelper.convert(excl
|
||||
.toArray(new DatanodeInfoProto[excl.size()])), req.getFileId());
|
||||
.toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(),
|
||||
(favor == null || favor.size() == 0) ? null : favor
|
||||
.toArray(new String[favor.size()]));
|
||||
return AddBlockResponseProto.newBuilder()
|
||||
.setBlock(PBHelper.convert(result)).build();
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -302,7 +302,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
|
||||
@Override
|
||||
public LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
|
||||
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
|
||||
String[] favoredNodes)
|
||||
throws AccessControlException, FileNotFoundException,
|
||||
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
|
||||
IOException {
|
||||
|
@ -312,6 +313,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
req.setPrevious(PBHelper.convert(previous));
|
||||
if (excludeNodes != null)
|
||||
req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
|
||||
if (favoredNodes != null) {
|
||||
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
|
||||
}
|
||||
try {
|
||||
return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
|
||||
} catch (ServiceException e) {
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||
|
@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
|||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
|
@ -1333,11 +1335,12 @@ public class BlockManager {
|
|||
public DatanodeDescriptor[] chooseTarget(final String src,
|
||||
final int numOfReplicas, final DatanodeDescriptor client,
|
||||
final HashMap<Node, Node> excludedNodes,
|
||||
final long blocksize) throws IOException {
|
||||
// choose targets for the new block to be allocated.
|
||||
final long blocksize, List<String> favoredNodes) throws IOException {
|
||||
List<DatanodeDescriptor> favoredDatanodeDescriptors =
|
||||
getDatanodeDescriptors(favoredNodes);
|
||||
final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
|
||||
numOfReplicas, client, new ArrayList<DatanodeDescriptor>(), false,
|
||||
excludedNodes, blocksize);
|
||||
numOfReplicas, client, excludedNodes, blocksize,
|
||||
favoredDatanodeDescriptors);
|
||||
if (targets.length < minReplication) {
|
||||
throw new IOException("File " + src + " could only be replicated to "
|
||||
+ targets.length + " nodes instead of minReplication (="
|
||||
|
@ -1350,6 +1353,24 @@ public class BlockManager {
|
|||
return targets;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get list of datanode descriptors for given list of nodes. Nodes are
|
||||
* hostaddress:port or just hostaddress.
|
||||
*/
|
||||
List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
|
||||
List<DatanodeDescriptor> datanodeDescriptors = null;
|
||||
if (nodes != null) {
|
||||
datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
|
||||
for (int i = 0; i < nodes.size(); i++) {
|
||||
DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
|
||||
if (node != null) {
|
||||
datanodeDescriptors.add(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
return datanodeDescriptors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the data-nodes the block belongs to and choose one,
|
||||
* which will be the replication source.
|
||||
|
|
|
@ -119,6 +119,25 @@ public abstract class BlockPlacementPolicy {
|
|||
chosenNodes, false, excludedNodes, blocksize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean,
|
||||
* HashMap, long)} with added parameter {@code favoredDatanodes}
|
||||
* @param favoredNodes datanodes that should be favored as targets. This
|
||||
* is only a hint and due to cluster state, namenode may not be
|
||||
* able to place the blocks on these datanodes.
|
||||
*/
|
||||
DatanodeDescriptor[] chooseTarget(String src,
|
||||
int numOfReplicas, DatanodeDescriptor writer,
|
||||
HashMap<Node, Node> excludedNodes,
|
||||
long blocksize, List<DatanodeDescriptor> favoredNodes) {
|
||||
// This class does not provide the functionality of placing
|
||||
// a block in favored datanodes. The implementations of this class
|
||||
// are expected to provide this functionality
|
||||
return chooseTarget(src, numOfReplicas, writer,
|
||||
new ArrayList<DatanodeDescriptor>(numOfReplicas), false, excludedNodes,
|
||||
blocksize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the block is replicated on at least minRacks different racks
|
||||
* if there is more than minRacks rack in the system.
|
||||
|
|
|
@ -125,6 +125,60 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
excludedNodes, blocksize);
|
||||
}
|
||||
|
||||
@Override
|
||||
DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas,
|
||||
DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
|
||||
long blocksize, List<DatanodeDescriptor> favoredNodes) {
|
||||
try {
|
||||
if (favoredNodes == null || favoredNodes.size() == 0) {
|
||||
// Favored nodes not specified, fall back to regular block placement.
|
||||
return chooseTarget(src, numOfReplicas, writer,
|
||||
new ArrayList<DatanodeDescriptor>(numOfReplicas), false,
|
||||
excludedNodes, blocksize);
|
||||
}
|
||||
|
||||
HashMap<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
|
||||
new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
|
||||
|
||||
// Choose favored nodes
|
||||
List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
|
||||
boolean avoidStaleNodes = stats != null
|
||||
&& stats.isAvoidingStaleDataNodesForWrite();
|
||||
for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
|
||||
DatanodeDescriptor favoredNode = favoredNodes.get(i);
|
||||
// Choose a single node which is local to favoredNode.
|
||||
// 'results' is updated within chooseLocalNode
|
||||
DatanodeDescriptor target = chooseLocalNode(favoredNode,
|
||||
favoriteAndExcludedNodes, blocksize,
|
||||
getMaxNodesPerRack(results,
|
||||
numOfReplicas)[1], results, avoidStaleNodes);
|
||||
if (target == null) {
|
||||
LOG.warn("Could not find a target for file " + src
|
||||
+ " with favored node " + favoredNode);
|
||||
continue;
|
||||
}
|
||||
favoriteAndExcludedNodes.put(target, target);
|
||||
}
|
||||
|
||||
if (results.size() < numOfReplicas) {
|
||||
// Not enough favored nodes, choose other nodes.
|
||||
numOfReplicas -= results.size();
|
||||
DatanodeDescriptor[] remainingTargets =
|
||||
chooseTarget(src, numOfReplicas, writer, results,
|
||||
false, favoriteAndExcludedNodes, blocksize);
|
||||
for (int i = 0; i < remainingTargets.length; i++) {
|
||||
results.add(remainingTargets[i]);
|
||||
}
|
||||
}
|
||||
return results.toArray(new DatanodeDescriptor[results.size()]);
|
||||
} catch (NotEnoughReplicasException nr) {
|
||||
// Fall back to regular block placement disregarding favored nodes hint
|
||||
return chooseTarget(src, numOfReplicas, writer,
|
||||
new ArrayList<DatanodeDescriptor>(numOfReplicas), false,
|
||||
excludedNodes, blocksize);
|
||||
}
|
||||
}
|
||||
|
||||
/** This is the implementation. */
|
||||
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
|
||||
DatanodeDescriptor writer,
|
||||
|
@ -140,15 +194,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
excludedNodes = new HashMap<Node, Node>();
|
||||
}
|
||||
|
||||
int clusterSize = clusterMap.getNumOfLeaves();
|
||||
int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
|
||||
if (totalNumOfReplicas > clusterSize) {
|
||||
numOfReplicas -= (totalNumOfReplicas-clusterSize);
|
||||
totalNumOfReplicas = clusterSize;
|
||||
}
|
||||
|
||||
int maxNodesPerRack =
|
||||
(totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
|
||||
int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
|
||||
numOfReplicas = result[0];
|
||||
int maxNodesPerRack = result[1];
|
||||
|
||||
List<DatanodeDescriptor> results =
|
||||
new ArrayList<DatanodeDescriptor>(chosenNodes);
|
||||
|
@ -175,6 +223,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
results.toArray(new DatanodeDescriptor[results.size()]));
|
||||
}
|
||||
|
||||
private int[] getMaxNodesPerRack(List<DatanodeDescriptor> chosenNodes,
|
||||
int numOfReplicas) {
|
||||
int clusterSize = clusterMap.getNumOfLeaves();
|
||||
int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
|
||||
if (totalNumOfReplicas > clusterSize) {
|
||||
numOfReplicas -= (totalNumOfReplicas-clusterSize);
|
||||
totalNumOfReplicas = clusterSize;
|
||||
}
|
||||
int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
|
||||
return new int[] {numOfReplicas, maxNodesPerRack};
|
||||
}
|
||||
|
||||
/* choose <i>numOfReplicas</i> from all data nodes */
|
||||
private DatanodeDescriptor chooseTarget(int numOfReplicas,
|
||||
DatanodeDescriptor writer,
|
||||
|
|
|
@ -326,6 +326,68 @@ public class DatanodeManager {
|
|||
return host2DatanodeMap.getDatanodeByHost(host);
|
||||
}
|
||||
|
||||
/** @return the datanode descriptor for the host. */
|
||||
public DatanodeDescriptor getDatanodeByXferAddr(String host, int xferPort) {
|
||||
return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given datanode address or host name, returns the DatanodeDescriptor for the
|
||||
* same, or if it doesn't find the datanode, it looks for a machine local and
|
||||
* then rack local datanode, if a rack local datanode is not possible either,
|
||||
* it returns the DatanodeDescriptor of any random node in the cluster.
|
||||
*
|
||||
* @param address hostaddress:transfer address
|
||||
* @return the best match for the given datanode
|
||||
* @throws IOException when no datanode is found for given address
|
||||
*/
|
||||
DatanodeDescriptor getDatanodeDescriptor(String address) {
|
||||
DatanodeDescriptor node = null;
|
||||
int colon = address.indexOf(":");
|
||||
int xferPort;
|
||||
String host = address;
|
||||
if (colon > 0) {
|
||||
host = address.substring(0, colon);
|
||||
xferPort = Integer.parseInt(address.substring(colon+1));
|
||||
node = getDatanodeByXferAddr(host, xferPort);
|
||||
}
|
||||
if (node == null) {
|
||||
node = getDatanodeByHost(host);
|
||||
}
|
||||
if (node == null) {
|
||||
String networkLocation = resolveNetworkLocation(host);
|
||||
|
||||
// If the current cluster doesn't contain the node, fallback to
|
||||
// something machine local and then rack local.
|
||||
List<Node> rackNodes = getNetworkTopology()
|
||||
.getDatanodesInRack(networkLocation);
|
||||
if (rackNodes != null) {
|
||||
// Try something machine local.
|
||||
for (Node rackNode : rackNodes) {
|
||||
if (((DatanodeDescriptor) rackNode).getIpAddr().equals(host)) {
|
||||
node = (DatanodeDescriptor) rackNode;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Try something rack local.
|
||||
if (node == null && !rackNodes.isEmpty()) {
|
||||
node = (DatanodeDescriptor) (rackNodes
|
||||
.get(DFSUtil.getRandom().nextInt(rackNodes.size())));
|
||||
}
|
||||
}
|
||||
|
||||
// If we can't even choose rack local, just choose any node in the
|
||||
// cluster.
|
||||
if (node == null) {
|
||||
node = (DatanodeDescriptor)getNetworkTopology()
|
||||
.chooseRandom(NodeBase.ROOT);
|
||||
}
|
||||
}
|
||||
return node;
|
||||
}
|
||||
|
||||
|
||||
/** Get a datanode descriptor given corresponding storageID */
|
||||
DatanodeDescriptor getDatanode(final String storageID) {
|
||||
return datanodeMap.get(storageID);
|
||||
|
@ -455,8 +517,13 @@ public class DatanodeManager {
|
|||
}
|
||||
}
|
||||
|
||||
public String resolveNetworkLocation(String host) {
|
||||
DatanodeID d = parseDNFromHostsEntry(host);
|
||||
return resolveNetworkLocation(d);
|
||||
}
|
||||
|
||||
/* Resolve a node's network location */
|
||||
private void resolveNetworkLocation (DatanodeDescriptor node) {
|
||||
private String resolveNetworkLocation (DatanodeID node) {
|
||||
List<String> names = new ArrayList<String>(1);
|
||||
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
||||
names.add(node.getIpAddr());
|
||||
|
@ -474,7 +541,7 @@ public class DatanodeManager {
|
|||
} else {
|
||||
networkLocation = rName.get(0);
|
||||
}
|
||||
node.setNetworkLocation(networkLocation);
|
||||
return networkLocation;
|
||||
}
|
||||
|
||||
private boolean inHostsList(DatanodeID node) {
|
||||
|
@ -707,7 +774,7 @@ public class DatanodeManager {
|
|||
nodeS.setDisallowed(false); // Node is in the include list
|
||||
|
||||
// resolve network location
|
||||
resolveNetworkLocation(nodeS);
|
||||
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
|
||||
getNetworkTopology().add(nodeS);
|
||||
|
||||
// also treat the registration message as a heartbeat
|
||||
|
@ -739,7 +806,7 @@ public class DatanodeManager {
|
|||
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
|
||||
boolean success = false;
|
||||
try {
|
||||
resolveNetworkLocation(nodeDescr);
|
||||
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
|
||||
networktopology.add(nodeDescr);
|
||||
|
||||
// register new datanode
|
||||
|
|
|
@ -2213,7 +2213,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
* client to "try again later".
|
||||
*/
|
||||
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
||||
ExtendedBlock previous, HashMap<Node, Node> excludedNodes)
|
||||
ExtendedBlock previous, HashMap<Node, Node> excludedNodes,
|
||||
List<String> favoredNodes)
|
||||
throws LeaseExpiredException, NotReplicatedYetException,
|
||||
QuotaExceededException, SafeModeException, UnresolvedLinkException,
|
||||
IOException {
|
||||
|
@ -2254,7 +2255,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
// choose targets for the new block to be allocated.
|
||||
final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
|
||||
src, replication, clientNode, excludedNodes, blockSize);
|
||||
src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
|
||||
|
||||
// Part II.
|
||||
// Allocate a new block, add it to the INode and the BlocksMap.
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -482,7 +483,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
|
||||
@Override
|
||||
public LocatedBlock addBlock(String src, String clientName,
|
||||
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId)
|
||||
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
|
||||
String[] favoredNodes)
|
||||
throws IOException {
|
||||
if (stateChangeLog.isDebugEnabled()) {
|
||||
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
|
||||
|
@ -495,8 +497,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
excludedNodesSet.put(node, node);
|
||||
}
|
||||
}
|
||||
List<String> favoredNodesList = (favoredNodes == null) ? null
|
||||
: Arrays.asList(favoredNodes);
|
||||
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
|
||||
clientName, previous, excludedNodesSet);
|
||||
clientName, previous, excludedNodesSet, favoredNodesList);
|
||||
if (locatedBlock != null)
|
||||
metrics.incrAddBlockOps();
|
||||
return locatedBlock;
|
||||
|
|
|
@ -121,6 +121,7 @@ message AddBlockRequestProto {
|
|||
optional ExtendedBlockProto previous = 3;
|
||||
repeated DatanodeInfoProto excludeNodes = 4;
|
||||
optional uint64 fileId = 5 [default = 0]; // default as a bogus id
|
||||
repeated string favoredNodes = 6; //the set of datanodes to use for the block
|
||||
}
|
||||
|
||||
message AddBlockResponseProto {
|
||||
|
|
|
@ -241,7 +241,7 @@ public class TestDFSClientRetries {
|
|||
anyString(),
|
||||
any(ExtendedBlock.class),
|
||||
any(DatanodeInfo[].class),
|
||||
anyLong())).thenAnswer(answer);
|
||||
anyLong(), any(String[].class))).thenAnswer(answer);
|
||||
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
|
@ -390,7 +390,7 @@ public class TestDFSClientRetries {
|
|||
}
|
||||
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
|
||||
Mockito.anyLong());
|
||||
Mockito.anyLong(), Mockito.<String[]> any());
|
||||
|
||||
doAnswer(new Answer<Boolean>() {
|
||||
|
||||
|
@ -432,7 +432,7 @@ public class TestDFSClientRetries {
|
|||
Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
|
||||
Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
|
||||
Mockito.anyLong());
|
||||
Mockito.anyLong(), Mockito.<String[]> any());
|
||||
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
|
||||
Mockito.anyString(), Mockito.anyString(),
|
||||
Mockito.<ExtendedBlock>any());
|
||||
|
|
|
@ -519,7 +519,7 @@ public class TestFileCreation {
|
|||
|
||||
// add one block to the file
|
||||
LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
|
||||
client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID);
|
||||
client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID, null);
|
||||
System.out.println("testFileCreationError2: "
|
||||
+ "Added block " + location.getBlock());
|
||||
|
||||
|
@ -570,7 +570,7 @@ public class TestFileCreation {
|
|||
createFile(dfs, f, 3);
|
||||
try {
|
||||
cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName,
|
||||
null, null, INodeId.GRANDFATHER_INODE_ID);
|
||||
null, null, INodeId.GRANDFATHER_INODE_ID, null);
|
||||
fail();
|
||||
} catch(IOException ioe) {
|
||||
FileSystem.LOG.info("GOOD!", ioe);
|
||||
|
|
|
@ -1059,7 +1059,7 @@ public class NNThroughputBenchmark {
|
|||
ExtendedBlock prevBlock = null;
|
||||
for(int jdx = 0; jdx < blocksPerFile; jdx++) {
|
||||
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
|
||||
prevBlock, null, INodeId.GRANDFATHER_INODE_ID);
|
||||
prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null);
|
||||
prevBlock = loc.getBlock();
|
||||
for(DatanodeInfo dnInfo : loc.getLocations()) {
|
||||
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr());
|
||||
|
|
|
@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy;
|
|||
import java.lang.reflect.Field;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -108,7 +109,7 @@ public class TestAddBlockRetry {
|
|||
if(count == 1) { // run second addBlock()
|
||||
LOG.info("Starting second addBlock for " + src);
|
||||
nn.addBlock(src, "clientName", null, null,
|
||||
INodeId.GRANDFATHER_INODE_ID);
|
||||
INodeId.GRANDFATHER_INODE_ID, null);
|
||||
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
|
||||
lb2 = lbs.get(0);
|
||||
|
@ -119,7 +120,7 @@ public class TestAddBlockRetry {
|
|||
}
|
||||
}).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
|
||||
Mockito.<DatanodeDescriptor>any(), Mockito.<HashMap<Node, Node>>any(),
|
||||
Mockito.anyLong());
|
||||
Mockito.anyLong(), Mockito.<List<String>>any());
|
||||
|
||||
// create file
|
||||
nn.create(src, FsPermission.getFileDefault(),
|
||||
|
@ -129,7 +130,7 @@ public class TestAddBlockRetry {
|
|||
|
||||
// start first addBlock()
|
||||
LOG.info("Starting first addBlock for " + src);
|
||||
nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID);
|
||||
nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null);
|
||||
|
||||
// check locations
|
||||
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||
|
|
|
@ -0,0 +1,220 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.junit.Test;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
|
||||
public class TestFavoredNodesEndToEnd {
|
||||
private static MiniDFSCluster cluster;
|
||||
private static Configuration conf;
|
||||
private final static int NUM_DATA_NODES = 10;
|
||||
private final static int NUM_FILES = 10;
|
||||
private final static byte[] SOME_BYTES = new String("foo").getBytes();
|
||||
private static DistributedFileSystem dfs;
|
||||
private static ArrayList<DataNode> datanodes;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
|
||||
.build();
|
||||
cluster.waitClusterUp();
|
||||
dfs = cluster.getFileSystem();
|
||||
datanodes = cluster.getDataNodes();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFavoredNodesEndToEnd() throws Exception {
|
||||
//create 10 files with random preferred nodes
|
||||
for (int i = 0; i < NUM_FILES; i++) {
|
||||
Random rand = new Random(System.currentTimeMillis() + i);
|
||||
//pass a new created rand so as to get a uniform distribution each time
|
||||
//without too much collisions (look at the do-while loop in getDatanodes)
|
||||
InetSocketAddress datanode[] = getDatanodes(rand);
|
||||
Path p = new Path("/filename"+i);
|
||||
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
|
||||
4096, (short)3, (long)4096, null, datanode);
|
||||
out.write(SOME_BYTES);
|
||||
out.close();
|
||||
BlockLocation[] locations =
|
||||
dfs.getClient().getBlockLocations(p.toUri().getPath(), 0,
|
||||
Long.MAX_VALUE);
|
||||
//make sure we have exactly one block location, and three hosts
|
||||
assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
|
||||
//verify the files got created in the right nodes
|
||||
for (BlockLocation loc : locations) {
|
||||
String[] hosts = loc.getNames();
|
||||
String[] hosts1 = getStringForInetSocketAddrs(datanode);
|
||||
assertTrue(compareNodes(hosts, hosts1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenFavoredNodesNotPresent() throws Exception {
|
||||
//when we ask for favored nodes but the nodes are not there, we should
|
||||
//get some other nodes. In other words, the write to hdfs should not fail
|
||||
//and if we do getBlockLocations on the file, we should see one blklocation
|
||||
//and three hosts for that
|
||||
Random rand = new Random(System.currentTimeMillis());
|
||||
InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3];
|
||||
for (int i = 0; i < 3; i++) {
|
||||
arbitraryAddrs[i] = getArbitraryLocalHostAddr();
|
||||
}
|
||||
Path p = new Path("/filename-foo-bar");
|
||||
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
|
||||
4096, (short)3, (long)4096, null, arbitraryAddrs);
|
||||
out.write(SOME_BYTES);
|
||||
out.close();
|
||||
BlockLocation[] locations =
|
||||
dfs.getClient().getBlockLocations(p.toUri().getPath(), 0,
|
||||
Long.MAX_VALUE);
|
||||
assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenSomeNodesAreNotGood() throws Exception {
|
||||
//make some datanode not "good" so that even if the client prefers it,
|
||||
//the namenode would not give it as a replica to write to
|
||||
DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanodeByXferAddr(
|
||||
datanodes.get(0).getXferAddress().getAddress().getHostAddress(),
|
||||
datanodes.get(0).getXferAddress().getPort());
|
||||
//set the decommission status to true so that
|
||||
//BlockPlacementPolicyDefault.isGoodTarget returns false for this dn
|
||||
d.setDecommissioned();
|
||||
InetSocketAddress addrs[] = new InetSocketAddress[3];
|
||||
for (int i = 0; i < 3; i++) {
|
||||
addrs[i] = datanodes.get(i).getXferAddress();
|
||||
}
|
||||
Path p = new Path("/filename-foo-bar-baz");
|
||||
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
|
||||
4096, (short)3, (long)4096, null, addrs);
|
||||
out.write(SOME_BYTES);
|
||||
out.close();
|
||||
BlockLocation[] locations =
|
||||
dfs.getClient().getBlockLocations(p.toUri().getPath(), 0,
|
||||
Long.MAX_VALUE);
|
||||
//reset the state
|
||||
d.stopDecommission();
|
||||
assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
|
||||
//also make sure that the datanode[0] is not in the list of hosts
|
||||
String datanode0 =
|
||||
datanodes.get(0).getXferAddress().getAddress().getHostAddress()
|
||||
+ ":" + datanodes.get(0).getXferAddress().getPort();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
if (locations[0].getNames()[i].equals(datanode0)) {
|
||||
fail(datanode0 + " not supposed to be a replica for the block");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String[] getStringForInetSocketAddrs(InetSocketAddress[] datanode) {
|
||||
String strs[] = new String[datanode.length];
|
||||
for (int i = 0; i < datanode.length; i++) {
|
||||
strs[i] = datanode[i].getAddress().getHostAddress() + ":" +
|
||||
datanode[i].getPort();
|
||||
}
|
||||
return strs;
|
||||
}
|
||||
|
||||
private boolean compareNodes(String[] dnList1, String[] dnList2) {
|
||||
for (int i = 0; i < dnList1.length; i++) {
|
||||
boolean matched = false;
|
||||
for (int j = 0; j < dnList2.length; j++) {
|
||||
if (dnList1[i].equals(dnList2[j])) {
|
||||
matched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (matched == false) {
|
||||
fail(dnList1[i] + " not a favored node");
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private InetSocketAddress[] getDatanodes(Random rand) {
|
||||
//Get some unique random indexes
|
||||
int idx1 = rand.nextInt(NUM_DATA_NODES);
|
||||
int idx2;
|
||||
|
||||
do {
|
||||
idx2 = rand.nextInt(NUM_DATA_NODES);
|
||||
} while (idx1 == idx2);
|
||||
|
||||
int idx3;
|
||||
do {
|
||||
idx3 = rand.nextInt(NUM_DATA_NODES);
|
||||
} while (idx2 == idx3 || idx1 == idx3);
|
||||
|
||||
InetSocketAddress[] addrs = new InetSocketAddress[3];
|
||||
addrs[0] = datanodes.get(idx1).getXferAddress();
|
||||
addrs[1] = datanodes.get(idx2).getXferAddress();
|
||||
addrs[2] = datanodes.get(idx3).getXferAddress();
|
||||
return addrs;
|
||||
}
|
||||
|
||||
private InetSocketAddress getArbitraryLocalHostAddr()
|
||||
throws UnknownHostException{
|
||||
Random rand = new Random(System.currentTimeMillis());
|
||||
int port = rand.nextInt(65535);
|
||||
while (true) {
|
||||
boolean conflict = false;
|
||||
for (DataNode d : datanodes) {
|
||||
if (d.getXferAddress().getPort() == port) {
|
||||
port = rand.nextInt(65535);
|
||||
conflict = true;
|
||||
}
|
||||
}
|
||||
if (conflict == false) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return new InetSocketAddress(InetAddress.getLocalHost(), port);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue