HADOOP-8210. Common side of HDFS-3148: The client should be able to use multiple local interfaces for data transfer. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1308457 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-04-02 18:49:06 +00:00
parent bc13dfb142
commit 950273bde4
3 changed files with 150 additions and 11 deletions

View File

@ -140,6 +140,9 @@ Release 2.0.0 - UNRELEASED
HADOOP-8206. Common portion of a ZK-based failover controller (todd) HADOOP-8206. Common portion of a ZK-based failover controller (todd)
HADOOP-8210. Common side of HDFS-3148: The client should be able
to use multiple local interfaces for data transfer. (eli)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7524. Change RPC to allow multiple protocols including multuple HADOOP-7524. Change RPC to allow multiple protocols including multuple

View File

@ -27,7 +27,10 @@ import java.net.InetAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.Vector; import java.util.Vector;
import javax.naming.NamingException; import javax.naming.NamingException;
@ -113,6 +116,31 @@ public class DNS {
return null; return null;
} }
/**
* @param nif network interface to get addresses for
* @return set containing addresses for each subinterface of nif,
* see below for the rationale for using an ordered set
*/
private static LinkedHashSet<InetAddress> getSubinterfaceInetAddrs(
NetworkInterface nif) {
LinkedHashSet<InetAddress> addrs = new LinkedHashSet<InetAddress>();
Enumeration<NetworkInterface> subNifs = nif.getSubInterfaces();
while (subNifs.hasMoreElements()) {
NetworkInterface subNif = subNifs.nextElement();
addrs.addAll(Collections.list(subNif.getInetAddresses()));
}
return addrs;
}
/**
* Like {@link DNS#getIPs(String, boolean), but returns all
* IPs associated with the given interface and its subinterfaces.
*/
public static String[] getIPs(String strInterface)
throws UnknownHostException {
return getIPs(strInterface, true);
}
/** /**
* Returns all the IPs associated with the provided interface, if any, in * Returns all the IPs associated with the provided interface, if any, in
* textual form. * textual form.
@ -120,6 +148,9 @@ public class DNS {
* @param strInterface * @param strInterface
* The name of the network interface or sub-interface to query * The name of the network interface or sub-interface to query
* (eg eth0 or eth0:0) or the string "default" * (eg eth0 or eth0:0) or the string "default"
* @param returnSubinterfaces
* Whether to return IPs associated with subinterfaces of
* the given interface
* @return A string vector of all the IPs associated with the provided * @return A string vector of all the IPs associated with the provided
* interface. The local host IP is returned if the interface * interface. The local host IP is returned if the interface
* name "default" is specified or there is an I/O error looking * name "default" is specified or there is an I/O error looking
@ -128,8 +159,8 @@ public class DNS {
* If the given interface is invalid * If the given interface is invalid
* *
*/ */
public static String[] getIPs(String strInterface) public static String[] getIPs(String strInterface,
throws UnknownHostException { boolean returnSubinterfaces) throws UnknownHostException {
if ("default".equals(strInterface)) { if ("default".equals(strInterface)) {
return new String[] { cachedHostAddress }; return new String[] { cachedHostAddress };
} }
@ -147,12 +178,22 @@ public class DNS {
if (netIf == null) { if (netIf == null) {
throw new UnknownHostException("No such interface " + strInterface); throw new UnknownHostException("No such interface " + strInterface);
} }
Vector<String> ips = new Vector<String>();
Enumeration<InetAddress> addrs = netIf.getInetAddresses(); // NB: Using a LinkedHashSet to preserve the order for callers
while (addrs.hasMoreElements()) { // that depend on a particular element being 1st in the array.
ips.add(addrs.nextElement().getHostAddress()); // For example, getDefaultIP always returns the first element.
LinkedHashSet<InetAddress> allAddrs = new LinkedHashSet<InetAddress>();
allAddrs.addAll(Collections.list(netIf.getInetAddresses()));
if (!returnSubinterfaces) {
allAddrs.removeAll(getSubinterfaceInetAddrs(netIf));
} }
return ips.toArray(new String[] {});
String ips[] = new String[allAddrs.size()];
int i = 0;
for (InetAddress addr : allAddrs) {
ips[i++] = addr.getHostAddress();
}
return ips;
} }

View File

@ -43,6 +43,8 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.util.SubnetUtils;
import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -51,6 +53,8 @@ import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.base.Preconditions;
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class NetUtils { public class NetUtils {
@ -469,11 +473,27 @@ public class NetUtils {
* @see java.net.Socket#connect(java.net.SocketAddress, int) * @see java.net.Socket#connect(java.net.SocketAddress, int)
* *
* @param socket * @param socket
* @param endpoint * @param address the remote address
* @param timeout - timeout in milliseconds * @param timeout timeout in milliseconds
*/
public static void connect(Socket socket,
SocketAddress address,
int timeout) throws IOException {
connect(socket, address, null, timeout);
}
/**
* Like {@link NetUtils#connect(Socket, SocketAddress, int)} but
* also takes a local address and port to bind the socket to.
*
* @param socket
* @param address the remote address
* @param localAddr the local address to bind the socket to
* @param timeout timeout in milliseconds
*/ */
public static void connect(Socket socket, public static void connect(Socket socket,
SocketAddress endpoint, SocketAddress endpoint,
SocketAddress localAddr,
int timeout) throws IOException { int timeout) throws IOException {
if (socket == null || endpoint == null || timeout < 0) { if (socket == null || endpoint == null || timeout < 0) {
throw new IllegalArgumentException("Illegal argument for connect()"); throw new IllegalArgumentException("Illegal argument for connect()");
@ -481,6 +501,15 @@ public class NetUtils {
SocketChannel ch = socket.getChannel(); SocketChannel ch = socket.getChannel();
if (localAddr != null) {
Class localClass = localAddr.getClass();
Class remoteClass = endpoint.getClass();
Preconditions.checkArgument(localClass.equals(remoteClass),
"Local address %s must be of same family as remote address %s.",
localAddr, endpoint);
socket.bind(localAddr);
}
if (ch == null) { if (ch == null) {
// let the default implementation handle it. // let the default implementation handle it.
socket.connect(endpoint, timeout); socket.connect(endpoint, timeout);
@ -769,4 +798,70 @@ public class NetUtils {
("\"" + hostname + "\"") ("\"" + hostname + "\"")
: UNKNOWN_HOST; : UNKNOWN_HOST;
} }
/**
* @return true if the given string is a subnet specified
* using CIDR notation, false otherwise
*/
public static boolean isValidSubnet(String subnet) {
try {
new SubnetUtils(subnet);
return true;
} catch (IllegalArgumentException iae) {
return false;
}
}
/**
* Add all addresses associated with the given nif in the
* given subnet to the given list.
*/
private static void addMatchingAddrs(NetworkInterface nif,
SubnetInfo subnetInfo, List<InetAddress> addrs) {
Enumeration<InetAddress> ifAddrs = nif.getInetAddresses();
while (ifAddrs.hasMoreElements()) {
InetAddress ifAddr = ifAddrs.nextElement();
if (subnetInfo.isInRange(ifAddr.getHostAddress())) {
addrs.add(ifAddr);
}
}
}
/**
* Return an InetAddress for each interface that matches the
* given subnet specified using CIDR notation.
*
* @param subnet subnet specified using CIDR notation
* @param returnSubinterfaces
* whether to return IPs associated with subinterfaces
* @throws IllegalArgumentException if subnet is invalid
*/
public static List<InetAddress> getIPs(String subnet,
boolean returnSubinterfaces) {
List<InetAddress> addrs = new ArrayList<InetAddress>();
SubnetInfo subnetInfo = new SubnetUtils(subnet).getInfo();
Enumeration<NetworkInterface> nifs;
try {
nifs = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e) {
LOG.error("Unable to get host interfaces", e);
return addrs;
}
while (nifs.hasMoreElements()) {
NetworkInterface nif = nifs.nextElement();
// NB: adding addresses even if the nif is not up
addMatchingAddrs(nif, subnetInfo, addrs);
if (!returnSubinterfaces) {
continue;
}
Enumeration<NetworkInterface> subNifs = nif.getSubInterfaces();
while (subNifs.hasMoreElements()) {
addMatchingAddrs(subNifs.nextElement(), subnetInfo, addrs);
}
}
return addrs;
}
} }