HDFS-3150. Add option for clients to contact DNs via hostname. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1373143 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-08-14 22:15:57 +00:00
parent 2964cc0f59
commit 3cdd6a9a2d
24 changed files with 291 additions and 92 deletions

View File

@ -26,6 +26,8 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm) HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm)
HDFS-3150. Add option for clients to contact DNs via hostname. (eli)
IMPROVEMENTS IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG

View File

@ -86,11 +86,11 @@ protected boolean removeEldestEntry(
} }
private synchronized ClientDatanodeProtocol getDatanodeProxy( private synchronized ClientDatanodeProtocol getDatanodeProxy(
DatanodeInfo node, Configuration conf, int socketTimeout) DatanodeInfo node, Configuration conf, int socketTimeout,
throws IOException { boolean connectToDnViaHostname) throws IOException {
if (proxy == null) { if (proxy == null) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf, proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
socketTimeout); socketTimeout, connectToDnViaHostname);
} }
return proxy; return proxy;
} }
@ -156,14 +156,16 @@ private void removeBlockLocalPathInfo(ExtendedBlock b) {
*/ */
static BlockReaderLocal newBlockReader(Configuration conf, String file, static BlockReaderLocal newBlockReader(Configuration conf, String file,
ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
int socketTimeout, long startOffset, long length) throws IOException { int socketTimeout, long startOffset, long length,
boolean connectToDnViaHostname) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort()); .getIpcPort());
// check the cache first // check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) { if (pathinfo == null) {
pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token); pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
connectToDnViaHostname);
} }
// check to see if the file exists. It may so happen that the // check to see if the file exists. It may so happen that the
@ -241,11 +243,12 @@ private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk, private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
DatanodeInfo node, Configuration conf, int timeout, DatanodeInfo node, Configuration conf, int timeout,
Token<BlockTokenIdentifier> token) throws IOException { Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
BlockLocalPathInfo pathinfo = null; BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node, ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
conf, timeout); conf, timeout, connectToDnViaHostname);
try { try {
// make RPC to local datanode to find local pathnames of blocks // make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token); pathinfo = proxy.getBlockLocalPathInfo(blk, token);

View File

@ -49,6 +49,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -216,6 +218,7 @@ static class Conf {
final String taskId; final String taskId;
final FsPermission uMask; final FsPermission uMask;
final boolean useLegacyBlockReader; final boolean useLegacyBlockReader;
final boolean connectToDnViaHostname;
Conf(Configuration conf) { Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt( maxFailoverAttempts = conf.getInt(
@ -266,6 +269,8 @@ static class Conf {
useLegacyBlockReader = conf.getBoolean( useLegacyBlockReader = conf.getBoolean(
DFS_CLIENT_USE_LEGACY_BLOCKREADER, DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
} }
private int getChecksumType(Configuration conf) { private int getChecksumType(Configuration conf) {
@ -476,6 +481,14 @@ String getClientName() {
return clientName; return clientName;
} }
/**
* @return whether the client should use hostnames instead of IPs
* when connecting to DataNodes
*/
boolean connectToDnViaHostname() {
return dfsClientConf.connectToDnViaHostname;
}
void checkOpen() throws IOException { void checkOpen() throws IOException {
if (!clientRunning) { if (!clientRunning) {
IOException result = new IOException("Filesystem closed"); IOException result = new IOException("Filesystem closed");
@ -732,12 +745,12 @@ public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
*/ */
static BlockReader getLocalBlockReader(Configuration conf, static BlockReader getLocalBlockReader(Configuration conf,
String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken, String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock) DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
throws InvalidToken, IOException { boolean connectToDnViaHostname) throws InvalidToken, IOException {
try { try {
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken, return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes() chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- offsetIntoBlock); - offsetIntoBlock, connectToDnViaHostname);
} catch (RemoteException re) { } catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class, throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class); AccessControlException.class);
@ -1464,7 +1477,8 @@ public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen(); checkOpen();
return getFileChecksum(src, namenode, socketFactory, return getFileChecksum(src, namenode, socketFactory,
dfsClientConf.socketTimeout, getDataEncryptionKey()); dfsClientConf.socketTimeout, getDataEncryptionKey(),
dfsClientConf.connectToDnViaHostname);
} }
@InterfaceAudience.Private @InterfaceAudience.Private
@ -1510,7 +1524,8 @@ public DataEncryptionKey getDataEncryptionKey()
*/ */
public static MD5MD5CRC32FileChecksum getFileChecksum(String src, public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey) throws IOException { DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException {
//get all block locations //get all block locations
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
if (null == blockLocations) { if (null == blockLocations) {
@ -1548,9 +1563,11 @@ public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
try { try {
//connect to a datanode //connect to a datanode
sock = socketFactory.createSocket(); sock = socketFactory.createSocket();
NetUtils.connect(sock, String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname);
NetUtils.createSocketAddr(datanodes[j].getXferAddr()), if (LOG.isDebugEnabled()) {
timeout); LOG.debug("Connecting to datanode " + dnAddr);
}
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout); sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock); OutputStream unbufOut = NetUtils.getOutputStream(sock);

View File

@ -52,6 +52,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT"; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
// HA related configuration // HA related configuration
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider"; public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
@ -81,6 +83,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false; public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads"; public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false; public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port"; public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

View File

@ -199,7 +199,8 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException {
try { try {
cdp = DFSUtil.createClientDatanodeProtocolProxy( cdp = DFSUtil.createClientDatanodeProtocolProxy(
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock); datanode, dfsClient.conf, dfsClient.getConf().socketTimeout,
dfsClient.getConf().connectToDnViaHostname, locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@ -716,8 +717,12 @@ private DNAddrPair chooseDataNode(LocatedBlock block)
DatanodeInfo[] nodes = block.getLocations(); DatanodeInfo[] nodes = block.getLocations();
try { try {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes); DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
InetSocketAddress targetAddr = final String dnAddr =
NetUtils.createSocketAddr(chosenNode.getXferAddr()); chosenNode.getXferAddr(dfsClient.connectToDnViaHostname());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr); return new DNAddrPair(chosenNode, targetAddr);
} catch (IOException ie) { } catch (IOException ie) {
String blockInfo = block.getBlock() + " file=" + src; String blockInfo = block.getBlock() + " file=" + src;
@ -875,7 +880,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
if (dfsClient.shouldTryShortCircuitRead(dnAddr)) { if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
return DFSClient.getLocalBlockReader(dfsClient.conf, src, block, return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset); blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
dfsClient.connectToDnViaHostname());
} }
IOException err = null; IOException err = null;
@ -1183,7 +1189,7 @@ static DatanodeInfo bestNode(DatanodeInfo nodes[],
throw new IOException("No live nodes contain current block"); throw new IOException("No live nodes contain current block");
} }
/** Utility class to encapsulate data node info and its ip address. */ /** Utility class to encapsulate data node info and its address. */
static class DNAddrPair { static class DNAddrPair {
DatanodeInfo info; DatanodeInfo info;
InetSocketAddress addr; InetSocketAddress addr;

View File

@ -1100,7 +1100,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, " DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + "encryption key was invalid when connecting to "
+ nodes[0].getXferAddr() + " : " + ie); + nodes[0] + " : " + ie);
// The encryption key used is invalid. // The encryption key used is invalid.
refetchEncryptionKey--; refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey(); dfsClient.clearDataEncryptionKey();
@ -1112,7 +1112,8 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
// find the datanode that matches // find the datanode that matches
if (firstBadLink.length() != 0) { if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
if (nodes[i].getXferAddr().equals(firstBadLink)) { // NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr())) {
errorIndex = i; errorIndex = i;
break; break;
} }
@ -1216,11 +1217,11 @@ private void setLastException(IOException e) {
*/ */
static Socket createSocketForPipeline(final DatanodeInfo first, static Socket createSocketForPipeline(final DatanodeInfo first,
final int length, final DFSClient client) throws IOException { final int length, final DFSClient client) throws IOException {
if(DFSClient.LOG.isDebugEnabled()) { final String dnAddr = first.getXferAddr(client.connectToDnViaHostname());
DFSClient.LOG.debug("Connecting to datanode " + first); if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
} }
final InetSocketAddress isa = final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
NetUtils.createSocketAddr(first.getXferAddr());
final Socket sock = client.socketFactory.createSocket(); final Socket sock = client.socketFactory.createSocket();
final int timeout = client.getDatanodeReadTimeout(length); final int timeout = client.getDatanodeReadTimeout(length);
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout); NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout);

View File

@ -847,17 +847,17 @@ public static int roundBytesToGB(long bytes) {
/** Create a {@link ClientDatanodeProtocol} proxy */ /** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout, DatanodeID datanodeid, Configuration conf, int socketTimeout,
LocatedBlock locatedBlock) throws IOException { boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout, return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
locatedBlock); connectToDnViaHostname, locatedBlock);
} }
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy( static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout) DatanodeID datanodeid, Configuration conf, int socketTimeout,
throws IOException { boolean connectToDnViaHostname) throws IOException {
return new ClientDatanodeProtocolTranslatorPB( return new ClientDatanodeProtocolTranslatorPB(
datanodeid, conf, socketTimeout); datanodeid, conf, socketTimeout, connectToDnViaHostname);
} }
/** Create a {@link ClientDatanodeProtocol} proxy */ /** Create a {@link ClientDatanodeProtocol} proxy */

View File

@ -104,7 +104,7 @@ public String getXferAddr() {
/** /**
* @return IP:ipcPort string * @return IP:ipcPort string
*/ */
public String getIpcAddr() { private String getIpcAddr() {
return ipAddr + ":" + ipcPort; return ipAddr + ":" + ipcPort;
} }
@ -122,6 +122,29 @@ public String getXferAddrWithHostname() {
return hostName + ":" + xferPort; return hostName + ":" + xferPort;
} }
/**
* @return hostname:ipcPort
*/
private String getIpcAddrWithHostname() {
return hostName + ":" + ipcPort;
}
/**
* @param useHostname true to use the DN hostname, use the IP otherwise
* @return name:xferPort
*/
public String getXferAddr(boolean useHostname) {
return useHostname ? getXferAddrWithHostname() : getXferAddr();
}
/**
* @param useHostname true to use the DN hostname, use the IP otherwise
* @return name:ipcPort
*/
public String getIpcAddr(boolean useHostname) {
return useHostname ? getIpcAddrWithHostname() : getIpcAddr();
}
/** /**
* @return data storage ID. * @return data storage ID.
*/ */

View File

@ -73,10 +73,10 @@ public class ClientDatanodeProtocolTranslatorPB implements
RefreshNamenodesRequestProto.newBuilder().build(); RefreshNamenodesRequestProto.newBuilder().build();
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout, LocatedBlock locatedBlock) Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
throws IOException { LocatedBlock locatedBlock) throws IOException {
rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf,
socketTimeout, locatedBlock); socketTimeout, connectToDnViaHostname, locatedBlock);
} }
public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr, public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
@ -90,11 +90,17 @@ public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
* @param datanodeid Datanode to connect to. * @param datanodeid Datanode to connect to.
* @param conf Configuration. * @param conf Configuration.
* @param socketTimeout Socket timeout to use. * @param socketTimeout Socket timeout to use.
* @param connectToDnViaHostname connect to the Datanode using its hostname
* @throws IOException * @throws IOException
*/ */
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout) throws IOException { Configuration conf, int socketTimeout, boolean connectToDnViaHostname)
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr()); throws IOException {
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
}
rpcProxy = createClientDatanodeProtocolProxy(addr, rpcProxy = createClientDatanodeProtocolProxy(addr,
UserGroupInformation.getCurrentUser(), conf, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), socketTimeout); NetUtils.getDefaultSocketFactory(conf), socketTimeout);
@ -102,10 +108,11 @@ public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout, DatanodeID datanodeid, Configuration conf, int socketTimeout,
LocatedBlock locatedBlock) throws IOException { boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr()); final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("ClientDatanodeProtocol addr=" + addr); LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
} }
// Since we're creating a new UserGroupInformation here, we know that no // Since we're creating a new UserGroupInformation here, we know that no

View File

@ -55,7 +55,7 @@ class DNConf {
final boolean dropCacheBehindReads; final boolean dropCacheBehindReads;
final boolean syncOnClose; final boolean syncOnClose;
final boolean encryptDataTransfer; final boolean encryptDataTransfer;
final boolean connectToDnViaHostname;
final long readaheadLength; final long readaheadLength;
final long heartBeatInterval; final long heartBeatInterval;
@ -97,7 +97,9 @@ public DNConf(Configuration conf) {
dropCacheBehindReads = conf.getBoolean( dropCacheBehindReads = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);

View File

@ -279,6 +279,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private Configuration conf; private Configuration conf;
private final String userWithLocalPathAccess; private final String userWithLocalPathAccess;
private boolean connectToDnViaHostname;
ReadaheadPool readaheadPool; ReadaheadPool readaheadPool;
/** /**
@ -299,8 +300,11 @@ public static InetSocketAddress createSocketAddr(String target) {
final SecureResources resources) throws IOException { final SecureResources resources) throws IOException {
super(conf); super(conf);
this.userWithLocalPathAccess = conf this.userWithLocalPathAccess =
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY); conf.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
this.connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
try { try {
hostName = getHostName(conf); hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName); LOG.info("Configured hostname is " + hostName);
@ -882,7 +886,7 @@ public String getDisplayName() {
/** /**
* NB: The datanode can perform data transfer on the streaming * NB: The datanode can perform data transfer on the streaming
* address however clients are given the IPC IP address for data * address however clients are given the IPC IP address for data
* transfer, and that may be be a different address. * transfer, and that may be a different address.
* *
* @return socket address for data transfer * @return socket address for data transfer
*/ */
@ -929,12 +933,12 @@ DatanodeProtocolClientSideTranslatorPB connectToNN(
} }
public static InterDatanodeProtocol createInterDataNodeProtocolProxy( public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
DatanodeID datanodeid, final Configuration conf, final int socketTimeout) DatanodeID datanodeid, final Configuration conf, final int socketTimeout,
throws IOException { final boolean connectToDnViaHostname) throws IOException {
final InetSocketAddress addr = final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
NetUtils.createSocketAddr(datanodeid.getIpcAddr()); final InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (InterDatanodeProtocol.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr); LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
} }
final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
try { try {
@ -1390,8 +1394,11 @@ public void run() {
final boolean isClient = clientname.length() > 0; final boolean isClient = clientname.length() > 0;
try { try {
InetSocketAddress curTarget = final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
NetUtils.createSocketAddr(targets[0].getXferAddr()); InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
sock = newSocket(); sock = newSocket();
NetUtils.connect(sock, curTarget, dnConf.socketTimeout); NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setSoTimeout(targets.length * dnConf.socketTimeout); sock.setSoTimeout(targets.length * dnConf.socketTimeout);
@ -1847,7 +1854,7 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
DatanodeRegistration bpReg = bpos.bpRegistration; DatanodeRegistration bpReg = bpos.bpRegistration;
InterDatanodeProtocol datanode = bpReg.equals(id)? InterDatanodeProtocol datanode = bpReg.equals(id)?
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
dnConf.socketTimeout); dnConf.socketTimeout, dnConf.connectToDnViaHostname);
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
if (info != null && if (info != null &&
info.getGenerationStamp() >= block.getGenerationStamp() && info.getGenerationStamp() >= block.getGenerationStamp() &&

View File

@ -86,7 +86,7 @@ class DataXceiver extends Receiver implements Runnable {
private final DataNode datanode; private final DataNode datanode;
private final DNConf dnConf; private final DNConf dnConf;
private final DataXceiverServer dataXceiverServer; private final DataXceiverServer dataXceiverServer;
private final boolean connectToDnViaHostname;
private long opStartTime; //the start time of receiving an Op private long opStartTime; //the start time of receiving an Op
private final SocketInputWrapper socketIn; private final SocketInputWrapper socketIn;
private OutputStream socketOut; private OutputStream socketOut;
@ -113,6 +113,7 @@ private DataXceiver(Socket s,
this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode; this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer; this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
remoteAddress = s.getRemoteSocketAddress().toString(); remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString();
@ -404,7 +405,10 @@ public void writeBlock(final ExtendedBlock block,
if (targets.length > 0) { if (targets.length > 0) {
InetSocketAddress mirrorTarget = null; InetSocketAddress mirrorTarget = null;
// Connect to backup machine // Connect to backup machine
mirrorNode = targets[0].getXferAddr(); mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + mirrorNode);
}
mirrorTarget = NetUtils.createSocketAddr(mirrorNode); mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket(); mirrorSock = datanode.newSocket();
try { try {
@ -457,7 +461,8 @@ public void writeBlock(final ExtendedBlock block,
if (isClient) { if (isClient) {
BlockOpResponseProto.newBuilder() BlockOpResponseProto.newBuilder()
.setStatus(ERROR) .setStatus(ERROR)
.setFirstBadLink(mirrorNode) // NB: Unconditionally using the xfer addr w/o hostname
.setFirstBadLink(targets[0].getXferAddr())
.build() .build()
.writeDelimitedTo(replyOut); .writeDelimitedTo(replyOut);
replyOut.flush(); replyOut.flush();
@ -729,8 +734,11 @@ public void replaceBlock(final ExtendedBlock block,
try { try {
// get the output stream to the proxy // get the output stream to the proxy
InetSocketAddress proxyAddr = final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
NetUtils.createSocketAddr(proxySource.getXferAddr()); if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
proxySock = datanode.newSocket(); proxySock = datanode.newSocket();
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
proxySock.setSoTimeout(dnConf.socketTimeout); proxySock.setSoTimeout(dnConf.socketTimeout);
@ -891,6 +899,7 @@ private void checkAccess(DataOutputStream out, final boolean reply,
if (mode == BlockTokenSecretManager.AccessMode.WRITE) { if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
DatanodeRegistration dnR = DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(blk.getBlockPoolId()); datanode.getDNRegistrationForBP(blk.getBlockPoolId());
// NB: Unconditionally using the xfer addr w/o hostname
resp.setFirstBadLink(dnR.getXferAddr()); resp.setFirstBadLink(dnR.getXferAddr());
} }
resp.build().writeDelimitedTo(out); resp.build().writeDelimitedTo(out);

View File

@ -127,7 +127,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response
datanode, conf, getUGI(request, conf)); datanode, conf, getUGI(request, conf));
final ClientProtocol nnproxy = dfs.getNamenode(); final ClientProtocol nnproxy = dfs.getNamenode();
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum( final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey()); path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false);
MD5MD5CRC32FileChecksum.write(xml, checksum); MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) { } catch(IOException ioe) {
writeXml(ioe, path, xml); writeXml(ioe, path, xml);

View File

@ -53,7 +53,7 @@
<name>dfs.datanode.address</name> <name>dfs.datanode.address</name>
<value>0.0.0.0:50010</value> <value>0.0.0.0:50010</value>
<description> <description>
The address where the datanode server will listen to. The datanode server address and port for data transfer.
If the port is 0 then the server will start on a free port. If the port is 0 then the server will start on a free port.
</description> </description>
</property> </property>
@ -920,6 +920,22 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
<description>Whether clients should use datanode hostnames when
connecting to datanodes.
</description>
</property>
<property>
<name>dfs.datanode.use.datanode.hostname</name>
<value>false</value>
<description>Whether datanodes should use datanode hostnames when
connecting to other datanodes for data transfer.
</description>
</property>
<property> <property>
<name>dfs.client.local.interfaces</name> <name>dfs.client.local.interfaces</name>
<value></value> <value></value>

View File

@ -144,6 +144,7 @@ public static class Builder {
private boolean setupHostsFile = false; private boolean setupHostsFile = false;
private MiniDFSNNTopology nnTopology = null; private MiniDFSNNTopology nnTopology = null;
private boolean checkExitOnShutdown = true; private boolean checkExitOnShutdown = true;
private boolean checkDataNodeHostConfig = false;
public Builder(Configuration conf) { public Builder(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -253,6 +254,14 @@ public Builder checkExitOnShutdown(boolean val) {
return this; return this;
} }
/**
* Default: false
*/
public Builder checkDataNodeHostConfig(boolean val) {
this.checkDataNodeHostConfig = val;
return this;
}
/** /**
* Default: null * Default: null
*/ */
@ -316,7 +325,8 @@ private MiniDFSCluster(Builder builder) throws IOException {
builder.waitSafeMode, builder.waitSafeMode,
builder.setupHostsFile, builder.setupHostsFile,
builder.nnTopology, builder.nnTopology,
builder.checkExitOnShutdown); builder.checkExitOnShutdown,
builder.checkDataNodeHostConfig);
} }
public class DataNodeProperties { public class DataNodeProperties {
@ -550,7 +560,7 @@ public MiniDFSCluster(int nameNodePort,
initMiniDFSCluster(conf, numDataNodes, format, initMiniDFSCluster(conf, numDataNodes, format,
manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts, manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts,
simulatedCapacities, null, true, false, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true); MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false);
} }
private void initMiniDFSCluster( private void initMiniDFSCluster(
@ -560,7 +570,8 @@ private void initMiniDFSCluster(
StartupOption operation, String[] racks, StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId, String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile, boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown) MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
boolean checkDataNodeHostConfig)
throws IOException { throws IOException {
ExitUtil.disableSystemExit(); ExitUtil.disableSystemExit();
@ -616,7 +627,7 @@ private void initMiniDFSCluster(
// Start the DataNodes // Start the DataNodes
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks, startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
hosts, simulatedCapacities, setupHostsFile); hosts, simulatedCapacities, setupHostsFile, false, checkDataNodeHostConfig);
waitClusterUp(); waitClusterUp();
//make sure ProxyUsers uses the latest conf //make sure ProxyUsers uses the latest conf
ProxyUsers.refreshSuperUserGroupsConfiguration(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@ -957,7 +968,21 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile) throws IOException { boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts, startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, false); simulatedCapacities, setupHostsFile, false, false);
}
/**
* @see MiniDFSCluster#startDataNodes(Configuration, int, boolean, StartupOption,
* String[], String[], long[], boolean, boolean, boolean)
*/
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts,
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false);
} }
/** /**
@ -983,19 +1008,25 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
* @param simulatedCapacities array of capacities of the simulated data nodes * @param simulatedCapacities array of capacities of the simulated data nodes
* @param setupHostsFile add new nodes to dfs hosts files * @param setupHostsFile add new nodes to dfs hosts files
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config * @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
* *
* @throws IllegalStateException if NameNode has been shutdown * @throws IllegalStateException if NameNode has been shutdown
*/ */
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation, boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts, String[] racks, String[] hosts,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException { boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException {
if (operation == StartupOption.RECOVER) { if (operation == StartupOption.RECOVER) {
return; return;
} }
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1"); if (checkDataNodeHostConfig) {
conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
} else {
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
}
int curDatanodesNum = dataNodes.size(); int curDatanodesNum = dataNodes.size();
// for mincluster's the default initialDelay for BRs is 0 // for mincluster's the default initialDelay for BRs is 0

View File

@ -763,7 +763,7 @@ public void testClientDNProtocolTimeout() throws IOException {
try { try {
proxy = DFSUtil.createClientDatanodeProtocolProxy( proxy = DFSUtil.createClientDatanodeProtocolProxy(
fakeDnId, conf, 500, fakeBlock); fakeDnId, conf, 500, false, fakeBlock);
proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1)); proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1));
fail ("Did not get expected exception: SocketTimeoutException"); fail ("Did not get expected exception: SocketTimeoutException");

View File

@ -417,7 +417,6 @@ public void testFileChecksum() throws Exception {
final Configuration conf = getTestConfiguration(); final Configuration conf = getTestConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem hdfs = cluster.getFileSystem(); final FileSystem hdfs = cluster.getFileSystem();

View File

@ -171,7 +171,14 @@ public void testServerDefaults() throws IOException {
@Test @Test
public void testFileCreation() throws IOException { public void testFileCreation() throws IOException {
checkFileCreation(null); checkFileCreation(null, false);
}
/** Same test but the client should use DN hostnames */
@Test
public void testFileCreationUsingHostname() throws IOException {
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
checkFileCreation(null, true);
} }
/** Same test but the client should bind to a local interface */ /** Same test but the client should bind to a local interface */
@ -180,10 +187,10 @@ public void testFileCreationSetLocalInterface() throws IOException {
assumeTrue(System.getProperty("os.name").startsWith("Linux")); assumeTrue(System.getProperty("os.name").startsWith("Linux"));
// The mini cluster listens on the loopback so we can use it here // The mini cluster listens on the loopback so we can use it here
checkFileCreation("lo"); checkFileCreation("lo", false);
try { try {
checkFileCreation("bogus-interface"); checkFileCreation("bogus-interface", false);
fail("Able to specify a bogus interface"); fail("Able to specify a bogus interface");
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
assertEquals("No such interface bogus-interface", e.getMessage()); assertEquals("No such interface bogus-interface", e.getMessage());
@ -193,16 +200,28 @@ public void testFileCreationSetLocalInterface() throws IOException {
/** /**
* Test if file creation and disk space consumption works right * Test if file creation and disk space consumption works right
* @param netIf the local interface, if any, clients should use to access DNs * @param netIf the local interface, if any, clients should use to access DNs
* @param useDnHostname whether the client should contact DNs by hostname
*/ */
public void checkFileCreation(String netIf) throws IOException { public void checkFileCreation(String netIf, boolean useDnHostname)
throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (netIf != null) { if (netIf != null) {
conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf); conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf);
} }
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, useDnHostname);
if (useDnHostname) {
// Since the mini cluster only listens on the loopback we have to
// ensure the hostname used to access DNs maps to the loopback. We
// do this by telling the DN to advertise localhost as its hostname
// instead of the default hostname.
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
}
if (simulatedStorage) { if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.checkDataNodeHostConfig(true)
.build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
try { try {

View File

@ -92,7 +92,6 @@ public static void setUp() throws IOException {
RAN.setSeed(seed); RAN.setSeed(seed);
config = new Configuration(); config = new Configuration();
config.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
hdfs = cluster.getFileSystem(); hdfs = cluster.getFileSystem();
blockPoolId = cluster.getNamesystem().getBlockPoolId(); blockPoolId = cluster.getNamesystem().getBlockPoolId();

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.File; import java.io.File;
@ -41,6 +42,7 @@ public class TestMiniDFSCluster {
private static final String CLUSTER_2 = "cluster2"; private static final String CLUSTER_2 = "cluster2";
private static final String CLUSTER_3 = "cluster3"; private static final String CLUSTER_3 = "cluster3";
private static final String CLUSTER_4 = "cluster4"; private static final String CLUSTER_4 = "cluster4";
private static final String CLUSTER_5 = "cluster5";
protected String testDataPath; protected String testDataPath;
protected File testDataDir; protected File testDataDir;
@Before @Before
@ -125,4 +127,25 @@ public void testIsClusterUpAfterShutdown() throws Throwable {
} }
} }
} }
/** MiniDFSCluster should not clobber dfs.datanode.hostname if requested */
@Test(timeout=100000)
public void testClusterSetDatanodeHostname() throws Throwable {
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "MYHOST");
File testDataCluster5 = new File(testDataPath, CLUSTER_5);
String c5Path = testDataCluster5.getAbsolutePath();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, c5Path);
MiniDFSCluster cluster5 = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.checkDataNodeHostConfig(true)
.build();
try {
Assert.assertEquals("DataNode hostname config not respected", "MYHOST",
cluster5.getDataNodes().get(0).getDatanodeId().getHostName());
} finally {
MiniDFSCluster.shutdownCluster(cluster5);
}
}
} }

View File

@ -246,7 +246,7 @@ public void testGetBlockLocalPathInfo() throws IOException, InterruptedException
@Override @Override
public ClientDatanodeProtocol run() throws Exception { public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000); 60000, false);
} }
}); });
@ -264,7 +264,7 @@ public ClientDatanodeProtocol run() throws Exception {
@Override @Override
public ClientDatanodeProtocol run() throws Exception { public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000); 60000, false);
} }
}); });
try { try {

View File

@ -304,7 +304,7 @@ public void testBlockTokenRpcLeak() throws Exception {
long endTime = Time.now() + 3000; long endTime = Time.now() + 3000;
while (Time.now() < endTime) { while (Time.now() < endTime) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000, proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000,
fakeBlock); false, fakeBlock);
assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3)); assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
if (proxy != null) { if (proxy != null) {
RPC.stopProxy(proxy); RPC.stopProxy(proxy);

View File

@ -105,10 +105,13 @@ public static DatanodeProtocolClientSideTranslatorPB spyOnBposToNN(
} }
public static InterDatanodeProtocol createInterDatanodeProtocolProxy( public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
DataNode dn, DatanodeID datanodeid, final Configuration conf DataNode dn, DatanodeID datanodeid, final Configuration conf,
) throws IOException { boolean connectToDnViaHostname) throws IOException {
if (connectToDnViaHostname != dn.getDnConf().connectToDnViaHostname) {
throw new AssertionError("Unexpected DN hostname configuration");
}
return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf, return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
dn.getDnConf().socketTimeout); dn.getDnConf().socketTimeout, dn.getDnConf().connectToDnViaHostname);
} }
public static void shutdownBlockScanner(DataNode dn) { public static void shutdownBlockScanner(DataNode dn) {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClientAdapter; import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -59,6 +60,8 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assume.assumeTrue;
/** /**
* This tests InterDataNodeProtocol for block handling. * This tests InterDataNodeProtocol for block handling.
*/ */
@ -125,17 +128,42 @@ public static LocatedBlock getLastLocatedBlock(
return blocks.get(blocks.size() - 1); return blocks.get(blocks.size() - 1);
} }
/** Test block MD access via a DN */
@Test
public void testBlockMetaDataInfo() throws Exception {
checkBlockMetaDataInfo(false);
}
/** The same as above, but use hostnames for DN<->DN communication */
@Test
public void testBlockMetaDataInfoWithHostname() throws Exception {
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
checkBlockMetaDataInfo(true);
}
/** /**
* The following test first creates a file. * The following test first creates a file.
* It verifies the block information from a datanode. * It verifies the block information from a datanode.
* Then, it updates the block with new information and verifies again. * Then, it updates the block with new information and verifies again.
* @param useDnHostname whether DNs should connect to other DNs by hostname
*/ */
@Test private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
public void testBlockMetaDataInfo() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, useDnHostname);
if (useDnHostname) {
// Since the mini cluster only listens on the loopback we have to
// ensure the hostname used to access DNs maps to the loopback. We
// do this by telling the DN to advertise localhost as its hostname
// instead of the default hostname.
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
}
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.checkDataNodeHostConfig(true)
.build();
cluster.waitActive(); cluster.waitActive();
//create a file //create a file
@ -154,7 +182,7 @@ public void testBlockMetaDataInfo() throws Exception {
//connect to a data node //connect to a data node
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy( InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
datanode, datanodeinfo[0], conf); datanode, datanodeinfo[0], conf, useDnHostname);
//stop block scanner, so we could compare lastScanTime //stop block scanner, so we could compare lastScanTime
DataNodeTestUtils.shutdownBlockScanner(datanode); DataNodeTestUtils.shutdownBlockScanner(datanode);
@ -364,7 +392,7 @@ public void testInterDNProtocolTimeout() throws Throwable {
try { try {
proxy = DataNode.createInterDataNodeProtocolProxy( proxy = DataNode.createInterDataNodeProtocolProxy(
dInfo, conf, 500); dInfo, conf, 500, false);
proxy.initReplicaRecovery(new RecoveringBlock( proxy.initReplicaRecovery(new RecoveringBlock(
new ExtendedBlock("bpid", 1), null, 100)); new ExtendedBlock("bpid", 1), null, 100));
fail ("Expected SocketTimeoutException exception, but did not get."); fail ("Expected SocketTimeoutException exception, but did not get.");