HDFS-3148. svn merge -c 1308617 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1308620 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
44b0307763
commit
53227f7c36
|
@ -70,7 +70,8 @@ Release 2.0.0 - UNRELEASED
|
|||
|
||||
HDFS-3167. CLI-based driver for MiniDFSCluster. (Henry Robinson via atm)
|
||||
|
||||
HDFS-3120. Enable hsync and hflush by default. (eli)
|
||||
HDFS-3148. The client should be able to use multiple local interfaces
|
||||
for data transfer. (eli)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
|
@ -198,6 +199,8 @@ Release 2.0.0 - UNRELEASED
|
|||
|
||||
HDFS-3130. Move fsdataset implementation to a package. (szetszwo)
|
||||
|
||||
HDFS-3120. Enable hsync and hflush by default. (eli)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-2477. Optimize computing the diff between a block report and the
|
||||
|
|
|
@ -59,12 +59,16 @@ import java.net.InetSocketAddress;
|
|||
import java.net.NetworkInterface;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
|
@ -127,6 +131,7 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -136,7 +141,9 @@ import org.apache.hadoop.security.token.TokenRenewer;
|
|||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.net.InetAddresses;
|
||||
|
||||
/********************************************************
|
||||
* DFSClient can connect to a Hadoop Filesystem and
|
||||
|
@ -172,6 +179,8 @@ public class DFSClient implements java.io.Closeable {
|
|||
final LeaseRenewer leaserenewer;
|
||||
final SocketCache socketCache;
|
||||
final Conf dfsClientConf;
|
||||
private Random r = new Random();
|
||||
private SocketAddress[] localInterfaceAddrs;
|
||||
|
||||
/**
|
||||
* DFSClient configuration
|
||||
|
@ -365,6 +374,68 @@ public class DFSClient implements java.io.Closeable {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
|
||||
}
|
||||
String localInterfaces[] =
|
||||
conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
|
||||
localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
|
||||
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
|
||||
LOG.debug("Using local interfaces [" +
|
||||
Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
|
||||
Joiner.on(',').join(localInterfaceAddrs) + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the socket addresses to use with each configured
|
||||
* local interface. Local interfaces may be specified by IP
|
||||
* address, IP address range using CIDR notation, interface
|
||||
* name (e.g. eth0) or sub-interface name (e.g. eth0:0).
|
||||
* The socket addresses consist of the IPs for the interfaces
|
||||
* and the ephemeral port (port 0). If an IP, IP range, or
|
||||
* interface name matches an interface with sub-interfaces
|
||||
* only the IP of the interface is used. Sub-interfaces can
|
||||
* be used by specifying them explicitly (by IP or name).
|
||||
*
|
||||
* @return SocketAddresses for the configured local interfaces,
|
||||
* or an empty array if none are configured
|
||||
* @throws UnknownHostException if a given interface name is invalid
|
||||
*/
|
||||
private static SocketAddress[] getLocalInterfaceAddrs(
|
||||
String interfaceNames[]) throws UnknownHostException {
|
||||
List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
|
||||
for (String interfaceName : interfaceNames) {
|
||||
if (InetAddresses.isInetAddress(interfaceName)) {
|
||||
localAddrs.add(new InetSocketAddress(interfaceName, 0));
|
||||
} else if (NetUtils.isValidSubnet(interfaceName)) {
|
||||
for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
|
||||
localAddrs.add(new InetSocketAddress(addr, 0));
|
||||
}
|
||||
} else {
|
||||
for (String ip : DNS.getIPs(interfaceName, false)) {
|
||||
localAddrs.add(new InetSocketAddress(ip, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Select one of the configured local interfaces at random. We use a random
|
||||
* interface because other policies like round-robin are less effective
|
||||
* given that we cache connections to datanodes.
|
||||
*
|
||||
* @return one of the local interface addresses at random, or null if no
|
||||
* local interfaces are configured
|
||||
*/
|
||||
SocketAddress getRandomLocalInterfaceAddr() {
|
||||
if (localInterfaceAddrs.length == 0) {
|
||||
return null;
|
||||
}
|
||||
final int idx = r.nextInt(localInterfaceAddrs.length);
|
||||
final SocketAddress addr = localInterfaceAddrs[idx];
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Using local interface " + addr);
|
||||
}
|
||||
return addr;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -197,6 +197,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
|
||||
public static final String DFS_HOSTS = "dfs.hosts";
|
||||
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
|
||||
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
|
||||
|
||||
// Much code in hdfs is not yet updated to use these keys.
|
||||
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
|
||||
|
|
|
@ -780,7 +780,9 @@ public class DFSInputStream extends FSInputStream {
|
|||
// disaster.
|
||||
sock.setTcpNoDelay(true);
|
||||
|
||||
NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout);
|
||||
NetUtils.connect(sock, dnAddr,
|
||||
dfsClient.getRandomLocalInterfaceAddr(),
|
||||
dfsClient.getConf().socketTimeout);
|
||||
sock.setSoTimeout(dfsClient.getConf().socketTimeout);
|
||||
}
|
||||
|
||||
|
|
|
@ -1171,7 +1171,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
NetUtils.createSocketAddr(first.getXferAddr());
|
||||
final Socket sock = client.socketFactory.createSocket();
|
||||
final int timeout = client.getDatanodeReadTimeout(length);
|
||||
NetUtils.connect(sock, isa, timeout);
|
||||
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout);
|
||||
sock.setSoTimeout(timeout);
|
||||
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
|
||||
if(DFSClient.LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -844,4 +844,18 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.client.local.interfaces</name>
|
||||
<value></value>
|
||||
<description>A comma separated list of network interface names to use
|
||||
for data transfer between the client and datanodes. When creating
|
||||
a connection to read from or write to a datanode, the client
|
||||
chooses one of the specified interfaces at random and binds its
|
||||
socket to the IP of that interface. Individual names may be
|
||||
specified as either an interface name (eg "eth0"), a subinterface
|
||||
name (eg "eth0:0"), or an IP address (which may be specified using
|
||||
CIDR notation to match a range of IPs).
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.io.FileNotFoundException;
|
|||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.log4j.Level;
|
||||
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
/**
|
||||
* This class tests various cases during file creation.
|
||||
|
@ -140,11 +142,34 @@ public class TestFileCreation extends junit.framework.TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testFileCreation() throws IOException {
|
||||
checkFileCreation(null);
|
||||
}
|
||||
|
||||
/** Same test but the client should bind to a local interface */
|
||||
public void testFileCreationSetLocalInterface() throws IOException {
|
||||
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
|
||||
|
||||
// The mini cluster listens on the loopback so we can use it here
|
||||
checkFileCreation("lo");
|
||||
|
||||
try {
|
||||
checkFileCreation("bogus-interface");
|
||||
fail("Able to specify a bogus interface");
|
||||
} catch (UnknownHostException e) {
|
||||
assertEquals("No such interface bogus-interface", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if file creation and disk space consumption works right
|
||||
* @param netIf the local interface, if any, clients should use to access DNs
|
||||
*/
|
||||
public void testFileCreation() throws IOException {
|
||||
public void checkFileCreation(String netIf) throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
if (netIf != null) {
|
||||
conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf);
|
||||
}
|
||||
if (simulatedStorage) {
|
||||
SimulatedFSDataset.setFactory(conf);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue