HDFS-8829. Make SO_RCVBUF and SO_SNDBUF size configurable for DataTransferProtocol sockets and allow configuring auto-tuning (He Tianyi via Colin P. McCabe)

Change-Id: I77dc71aaf9e14ef743f2a2cbebeec04a4f628c78
This commit is contained in:
Colin Patrick Mccabe 2015-09-14 15:56:04 -07:00
parent e2a0270217
commit 7b5cf5352e
11 changed files with 169 additions and 12 deletions

View File

@ -928,6 +928,10 @@ Release 2.8.0 - UNRELEASED
HDFS-8929. Add a metric to expose the timestamp of the last journal
(surendra singh lilhore via vinayakumarb)
HDFS-8829. Make SO_RCVBUF and SO_SNDBUF size configurable for
DataTransferProtocol sockets and allow configuring auto-tuning (He Tianyi
via Colin P. McCabe)
BUG FIXES
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.

View File

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.http.HttpConfig;
@ -769,9 +770,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
false;
public static final String
DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY =
"dfs.datanode.transfer.socket.send.buffer.size";
public static final int
DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT =
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
public static final String
DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY =
"dfs.datanode.transfer.socket.recv.buffer.size";
public static final int
DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -49,6 +49,11 @@ public class DomainPeerServer implements PeerServer {
sock.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, size);
}
@Override
public int getReceiveBufferSize() throws IOException {
return sock.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
}
@Override
public Peer accept() throws IOException, SocketTimeoutException {
DomainSocket connSock = sock.accept();

View File

@ -32,7 +32,14 @@ public interface PeerServer extends Closeable {
public void setReceiveBufferSize(int size) throws IOException;
/**
* Listens for a connection to be made to this server and accepts
* Get the receive buffer size of the PeerServer.
*
* @return The receive buffer size.
*/
int getReceiveBufferSize() throws IOException;
/**
* Listens for a connection to be made to this server and accepts
* it. The method blocks until a connection is made.
*
* @exception IOException if an I/O error occurs when waiting for a

View File

@ -73,6 +73,11 @@ public class TcpPeerServer implements PeerServer {
this.serverSocket.setReceiveBufferSize(size);
}
@Override
public int getReceiveBufferSize() throws IOException {
return this.serverSocket.getReceiveBufferSize();
}
@Override
public Peer accept() throws IOException, SocketTimeoutException {
Peer peer = DFSUtilClient.peerFromSocket(serverSocket.accept());

View File

@ -71,7 +71,9 @@ public class DNConf {
final int socketTimeout;
final int socketWriteTimeout;
final int socketKeepaliveTimeout;
private final int transferSocketSendBufferSize;
private final int transferSocketRecvBufferSize;
final boolean transferToAllowed;
final boolean dropCacheBehindWrites;
final boolean syncBehindWrites;
@ -114,8 +116,14 @@ public class DNConf {
socketKeepaliveTimeout = conf.getInt(
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
/* Based on results on different platforms, we might need set the default
this.transferSocketSendBufferSize = conf.getInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
this.transferSocketRecvBufferSize = conf.getInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT);
/* Based on results on different platforms, we might need set the default
* to false on some of them. */
transferToAllowed = conf.getBoolean(
DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
@ -279,4 +287,12 @@ public class DNConf {
public boolean getAllowNonLocalLazyPersist() {
return allowNonLocalLazyPersist;
}
public int getTransferSocketRecvBufferSize() {
return transferSocketRecvBufferSize;
}
public int getTransferSocketSendBufferSize() {
return transferSocketSendBufferSize;
}
}

View File

@ -910,7 +910,10 @@ public class DataNode extends ReconfigurableBase
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf));
}
tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
if (dnConf.getTransferSocketRecvBufferSize() > 0) {
tcpPeerServer.setReceiveBufferSize(
dnConf.getTransferSocketRecvBufferSize());
}
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
@ -958,8 +961,12 @@ public class DataNode extends ReconfigurableBase
}
DomainPeerServer domainPeerServer =
new DomainPeerServer(domainSocketPath, port);
domainPeerServer.setReceiveBufferSize(
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
int recvBufferSize = conf.getInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT);
if (recvBufferSize > 0) {
domainPeerServer.setReceiveBufferSize(recvBufferSize);
}
return domainPeerServer;
}

View File

@ -709,8 +709,11 @@ class DataXceiver extends Receiver implements Runnable {
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
if (dnConf.getTransferSocketSendBufferSize() > 0) {
mirrorSock.setSendBufferSize(
dnConf.getTransferSocketSendBufferSize());
}
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);

View File

@ -278,7 +278,12 @@ class DataXceiverServer implements Runnable {
synchronized int getNumPeersXceiver() {
return peersXceiver.size();
}
@VisibleForTesting
PeerServer getPeerServer() {
return peerServer;
}
synchronized void releasePeer(Peer peer) {
peers.remove(peer);
peersXceiver.remove(peer);

View File

@ -2424,4 +2424,26 @@
</description>
</property>
<property>
<name>dfs.datanode.transfer.socket.send.buffer.size</name>
<value>131072</value>
<description>
Socket send buffer size for DataXceiver (mirroring packets to downstream
in pipeline). This may affect TCP connection throughput.
If it is set to zero or negative value, no buffer size will be set
explicitly, thus enable tcp auto-tuning on some system.
</description>
</property>
<property>
<name>dfs.datanode.transfer.socket.recv.buffer.size</name>
<value>131072</value>
<description>
Socket receive buffer size for DataXceiver (receiving packets from client
during block writing). This may affect TCP connection throughput.
If it is set to zero or negative value, no buffer size will be set
explicitly, thus enable tcp auto-tuning on some system.
</description>
</property>
</configuration>

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Test;
public class TestDataNodeTransferSocketSize {
@Test
public void testSpecifiedDataSocketSize() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, 4 * 1024);
SimulatedFSDataset.setFactory(conf);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
List<DataNode> datanodes = cluster.getDataNodes();
DataNode datanode = datanodes.get(0);
assertEquals("Receive buffer size should be 4K",
4 * 1024, datanode.getXferServer().getPeerServer().getReceiveBufferSize());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testAutoTuningDataSocketSize() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, 0);
SimulatedFSDataset.setFactory(conf);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
List<DataNode> datanodes = cluster.getDataNodes();
DataNode datanode = datanodes.get(0);
assertTrue(
"Receive buffer size should be a default value (determined by kernel)",
datanode.getXferServer().getPeerServer().getReceiveBufferSize() > 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}