HDFS-9805. Add server-side configuration for enabling TCP_NODELAY for DataTransferProtocol and default it to true (Gary Helmling via cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2016-06-29 12:34:45 -07:00
parent abe7fc22c1
commit e4a2545620
8 changed files with 503 additions and 3 deletions

View File

@ -82,6 +82,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
@ -735,6 +737,7 @@ public class DFSUtilClient {
String dnAddr = dn.getXferAddr(connectToDnViaHostname);
LOG.debug("Connecting to datanode {}", dnAddr);
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setTcpNoDelay(getClientDataTransferTcpNoDelay(conf));
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
@ -756,4 +759,10 @@ public class DFSUtilClient {
}
}
}
private static boolean getClientDataTransferTcpNoDelay(Configuration conf) {
return conf.getBoolean(
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
}
}

View File

@ -930,6 +930,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
public static final String
DFS_DATA_TRANSFER_SERVER_TCPNODELAY =
"dfs.data.transfer.server.tcpnodelay";
public static final boolean
DFS_DATA_TRANSFER_SERVER_TCPNODELAY_DEFAULT = true;
// Disk Balancer Keys
public static final String DFS_DISK_BALANCER_ENABLED =
"dfs.disk.balancer.enabled";

View File

@ -76,6 +76,7 @@ public class DNConf {
final int socketKeepaliveTimeout;
private final int transferSocketSendBufferSize;
private final int transferSocketRecvBufferSize;
private final boolean tcpNoDelay;
final boolean transferToAllowed;
final boolean dropCacheBehindWrites;
@ -132,6 +133,9 @@ public class DNConf {
this.transferSocketRecvBufferSize = conf.getInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT);
this.tcpNoDelay = conf.getBoolean(
DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY,
DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY_DEFAULT);
/* Based on results on different platforms, we might need set the default
* to false on some of them. */
@ -361,6 +365,10 @@ public class DNConf {
return transferSocketSendBufferSize;
}
public boolean getDataTransferServerTcpNoDelay() {
return tcpNoDelay;
}
public long getBpReadyTimeout() {
return bpReadyTimeout;
}

View File

@ -71,7 +71,6 @@ import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -96,6 +95,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -390,6 +390,8 @@ public class DataNode extends ReconfigurableBase
private DiskBalancer diskBalancer;
private final SocketFactory socketFactory;
private static Tracer createTracer(Configuration conf) {
return new Tracer.Builder("DataNode").
conf(TraceUtils.wrapHadoopConf(DATANODE_HTRACE_PREFIX , conf)).
@ -419,6 +421,7 @@ public class DataNode extends ReconfigurableBase
this.pipelineSupportECN = false;
this.checkDiskErrorInterval =
ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
initOOBTimeout();
}
@ -477,6 +480,8 @@ public class DataNode extends ReconfigurableBase
LOG.debug(this.fileDescriptorPassingDisabledReason);
}
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
try {
hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
@ -1676,8 +1681,7 @@ public class DataNode extends ReconfigurableBase
* Creates either NIO or regular depending on socketWriteTimeout.
*/
public Socket newSocket() throws IOException {
return (dnConf.socketWriteTimeout > 0) ?
SocketChannel.open().socket() : new Socket();
return socketFactory.createSocket();
}
/**
@ -2327,6 +2331,7 @@ public class DataNode extends ReconfigurableBase
}
sock = newSocket();
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
//

View File

@ -747,6 +747,7 @@ class DataXceiver extends Receiver implements Runnable {
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setKeepAlive(true);
if (dnConf.getTransferSocketSendBufferSize() > 0) {
@ -1118,6 +1119,7 @@ class DataXceiver extends Receiver implements Runnable {
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
proxySock = datanode.newSocket();
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
proxySock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
proxySock.setSoTimeout(dnConf.socketTimeout);
proxySock.setKeepAlive(true);

View File

@ -102,6 +102,8 @@ class StripedBlockWriter {
socket = datanode.newSocket();
NetUtils.connect(socket, targetAddr,
datanode.getDnConf().getSocketTimeout());
socket.setTcpNoDelay(
datanode.getDnConf().getDataTransferServerTcpNoDelay());
socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
Token<BlockTokenIdentifier> blockToken =

View File

@ -3430,6 +3430,14 @@
</description>
</property>
<property>
<name>dfs.data.transfer.server.tcpnodelay</name>
<value>true</value>
<description>
If true, set TCP_NODELAY to sockets for transferring data between Datanodes.
</description>
</property>
<property>
<name>dfs.datanode.balance.max.concurrent.moves</name>
<value>50</value>

View File

@ -0,0 +1,460 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StandardSocketFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
/**
* Checks that used sockets have TCP_NODELAY set when configured.
*/
public class TestDataNodeTcpNoDelay {
private static final Log LOG =
LogFactory.getLog(TestDataNodeTcpNoDelay.class);
private static Configuration baseConf;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
baseConf = new HdfsConfiguration();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Test
public void testTcpNoDelayEnabled() throws Exception {
Configuration testConf = new Configuration(baseConf);
// here we do not have to config TCP_NDELAY settings, since they should be
// active by default
testConf.set(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
SocketFactoryWrapper.class.getName());
SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory(testConf);
LOG.info("Socket factory is " + defaultFactory.getClass().getName());
MiniDFSCluster dfsCluster =
new MiniDFSCluster.Builder(testConf).numDataNodes(3).build();
dfsCluster.waitActive();
DistributedFileSystem dfs = dfsCluster.getFileSystem();
try {
createData(dfs);
transferBlock(dfs);
// check that TCP_NODELAY has been set on all sockets
assertTrue(SocketFactoryWrapper.wasTcpNoDelayActive());
} finally {
SocketFactoryWrapper.reset();
dfsCluster.shutdown();
}
}
@Test
public void testTcpNoDelayDisabled() throws Exception {
Configuration testConf = new Configuration(baseConf);
// disable TCP_NODELAY in settings
setTcpNoDelay(testConf, false);
testConf.set(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
SocketFactoryWrapper.class.getName());
SocketFactory defaultFactory = NetUtils.getDefaultSocketFactory(testConf);
LOG.info("Socket factory is " + defaultFactory.getClass().getName());
MiniDFSCluster dfsCluster =
new MiniDFSCluster.Builder(testConf).numDataNodes(3).build();
dfsCluster.waitActive();
DistributedFileSystem dfs = dfsCluster.getFileSystem();
try {
createData(dfs);
transferBlock(dfs);
// we can only check that TCP_NODELAY was disabled on some sockets,
// since part of the client write path always enables TCP_NODELAY
// by necessity
assertFalse(SocketFactoryWrapper.wasTcpNoDelayActive());
} finally {
SocketFactoryWrapper.reset();
dfsCluster.shutdown();
}
}
private void createData(DistributedFileSystem dfs) throws Exception {
Path dir = new Path("test-dir");
for (int i = 0; i < 3; i++) {
Path f = new Path(dir, "file" + i);
DFSTestUtil.createFile(dfs, f, 10240, (short) 3, 0);
}
}
/**
* Tests the {@code DataNode#transferBlocks()} path by re-replicating an
* existing block.
*/
private void transferBlock(DistributedFileSystem dfs) throws Exception {
Path dir = new Path("test-block-transfer");
Path f = new Path(dir, "testfile");
DFSTestUtil.createFile(dfs, f, 10240, (short) 1, 0);
// force a block transfer to another DN
dfs.setReplication(f, (short) 2);
DFSTestUtil.waitForReplication(dfs, f, (short) 2, 20000);
}
/**
* Sets known TCP_NODELAY configs to the given value.
*/
private void setTcpNoDelay(Configuration conf, boolean value) {
conf.setBoolean(
HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, value);
conf.setBoolean(
DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY, value);
conf.setBoolean(
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, value);
conf.setBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, value);
}
public static class SocketFactoryWrapper extends StandardSocketFactory {
private static List<SocketWrapper> sockets = new ArrayList<SocketWrapper>();
public static boolean wasTcpNoDelayActive() {
LOG.info("Checking " + sockets.size() + " sockets for TCP_NODELAY");
for (SocketWrapper sw : sockets) {
if (!sw.getLastTcpNoDelay()) {
return false;
}
}
return true;
}
public static void reset() {
sockets = new ArrayList<>();
}
@Override
public Socket createSocket() throws IOException {
LOG.info("Creating new socket");
SocketWrapper wrapper = new SocketWrapper(super.createSocket());
sockets.add(wrapper);
return wrapper;
}
@Override
public Socket createSocket(String host, int port)
throws IOException, UnknownHostException {
LOG.info("Creating socket for " + host);
SocketWrapper wrapper =
new SocketWrapper(super.createSocket(host, port));
sockets.add(wrapper);
return wrapper;
}
@Override
public Socket createSocket(String host, int port,
InetAddress localHostAddr, int localPort)
throws IOException, UnknownHostException {
LOG.info("Creating socket for " + host);
SocketWrapper wrapper = new SocketWrapper(
super.createSocket(host, port, localHostAddr, localPort));
sockets.add(wrapper);
return wrapper;
}
@Override
public Socket createSocket(InetAddress addr, int port) throws IOException {
LOG.info("Creating socket for " + addr);
SocketWrapper wrapper =
new SocketWrapper(super.createSocket(addr, port));
sockets.add(wrapper);
return wrapper;
}
@Override
public Socket createSocket(InetAddress addr, int port,
InetAddress localHostAddr, int localPort)
throws IOException {
LOG.info("Creating socket for " + addr);
SocketWrapper wrapper = new SocketWrapper(
super.createSocket(addr, port, localHostAddr, localPort));
sockets.add(wrapper);
return wrapper;
}
}
public static class SocketWrapper extends Socket {
private final Socket wrapped;
private boolean tcpNoDelay;
public SocketWrapper(Socket socket) {
this.wrapped = socket;
}
// Override methods, check whether tcpnodelay has been set for each socket
// created. This isn't perfect, as we could still send before tcpnodelay
// is set, but should at least trigger when tcpnodelay is never set at all.
@Override
public void connect(SocketAddress endpoint) throws IOException {
wrapped.connect(endpoint);
}
@Override
public void connect(SocketAddress endpoint, int timeout)
throws IOException {
wrapped.connect(endpoint, timeout);
}
@Override
public void bind(SocketAddress bindpoint) throws IOException {
wrapped.bind(bindpoint);
}
@Override
public InetAddress getInetAddress() {
return wrapped.getInetAddress();
}
@Override
public InetAddress getLocalAddress() {
return wrapped.getLocalAddress();
}
@Override
public int getPort() {
return wrapped.getPort();
}
@Override
public int getLocalPort() {
return wrapped.getLocalPort();
}
@Override
public SocketAddress getRemoteSocketAddress() {
return wrapped.getRemoteSocketAddress();
}
@Override
public SocketAddress getLocalSocketAddress() {
return wrapped.getLocalSocketAddress();
}
@Override
public SocketChannel getChannel() {
return wrapped.getChannel();
}
@Override
public InputStream getInputStream() throws IOException {
return wrapped.getInputStream();
}
@Override
public OutputStream getOutputStream() throws IOException {
return wrapped.getOutputStream();
}
@Override
public void setTcpNoDelay(boolean on) throws SocketException {
wrapped.setTcpNoDelay(on);
this.tcpNoDelay = on;
}
@Override
public boolean getTcpNoDelay() throws SocketException {
return wrapped.getTcpNoDelay();
}
@Override
public void setSoLinger(boolean on, int linger) throws SocketException {
wrapped.setSoLinger(on, linger);
}
@Override
public int getSoLinger() throws SocketException {
return wrapped.getSoLinger();
}
@Override
public void sendUrgentData(int data) throws IOException {
wrapped.sendUrgentData(data);
}
@Override
public void setOOBInline(boolean on) throws SocketException {
wrapped.setOOBInline(on);
}
@Override
public boolean getOOBInline() throws SocketException {
return wrapped.getOOBInline();
}
@Override
public synchronized void setSoTimeout(int timeout) throws SocketException {
wrapped.setSoTimeout(timeout);
}
@Override
public synchronized int getSoTimeout() throws SocketException {
return wrapped.getSoTimeout();
}
@Override
public synchronized void setSendBufferSize(int size)
throws SocketException {
wrapped.setSendBufferSize(size);
}
@Override
public synchronized int getSendBufferSize() throws SocketException {
return wrapped.getSendBufferSize();
}
@Override
public synchronized void setReceiveBufferSize(int size)
throws SocketException {
wrapped.setReceiveBufferSize(size);
}
@Override
public synchronized int getReceiveBufferSize() throws SocketException {
return wrapped.getReceiveBufferSize();
}
@Override
public void setKeepAlive(boolean on) throws SocketException {
wrapped.setKeepAlive(on);
}
@Override
public boolean getKeepAlive() throws SocketException {
return wrapped.getKeepAlive();
}
@Override
public void setTrafficClass(int tc) throws SocketException {
wrapped.setTrafficClass(tc);
}
@Override
public int getTrafficClass() throws SocketException {
return wrapped.getTrafficClass();
}
@Override
public void setReuseAddress(boolean on) throws SocketException {
wrapped.setReuseAddress(on);
}
@Override
public boolean getReuseAddress() throws SocketException {
return wrapped.getReuseAddress();
}
@Override
public synchronized void close() throws IOException {
wrapped.close();
}
@Override
public void shutdownInput() throws IOException {
wrapped.shutdownInput();
}
@Override
public void shutdownOutput() throws IOException {
wrapped.shutdownOutput();
}
@Override
public String toString() {
return wrapped.toString();
}
@Override
public boolean isConnected() {
return wrapped.isConnected();
}
@Override
public boolean isBound() {
return wrapped.isBound();
}
@Override
public boolean isClosed() {
return wrapped.isClosed();
}
@Override
public boolean isInputShutdown() {
return wrapped.isInputShutdown();
}
@Override
public boolean isOutputShutdown() {
return wrapped.isOutputShutdown();
}
@Override
public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) {
wrapped.setPerformancePreferences(connectionTime, latency, bandwidth);
}
public boolean getLastTcpNoDelay() {
return tcpNoDelay;
}
}
}