HDFS-9259. Make SO_SNDBUF size configurable at DFSClient side for hdfs write scenario. (Mingliang Liu via mingma)

(cherry picked from commit aa09880ab8)
This commit is contained in:
Ming Ma 2015-10-27 09:28:40 -07:00
parent c5bf1cb7af
commit 2c335a8434
6 changed files with 131 additions and 1 deletions

View File

@ -139,7 +139,9 @@ class DataStreamer extends Daemon {
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(),
conf.getSocketTimeout()); conf.getSocketTimeout());
sock.setSoTimeout(timeout); sock.setSoTimeout(timeout);
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); if (conf.getSocketSendBufferSize() > 0) {
sock.setSendBufferSize(conf.getSocketSendBufferSize());
}
LOG.debug("Send buf size {}", sock.getSendBufferSize()); LOG.debug("Send buf size {}", sock.getSendBufferSize());
return sock; return sock;
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.client; package org.apache.hadoop.hdfs.client;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -62,6 +63,10 @@ public interface HdfsClientConfigKeys {
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout"; String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
String DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY =
"dfs.client.socket.send.buffer.size";
int DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT =
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY =
"dfs.client.socketcache.capacity"; "dfs.client.socketcache.capacity";
int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;

View File

@ -55,6 +55,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCK
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@ -104,6 +106,7 @@ public class DfsClientConf {
private final int writeMaxPackets; private final int writeMaxPackets;
private final ByteArrayManager.Conf writeByteArrayManagerConf; private final ByteArrayManager.Conf writeByteArrayManagerConf;
private final int socketTimeout; private final int socketTimeout;
private final int socketSendBufferSize;
private final long excludedNodesCacheExpiry; private final long excludedNodesCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught. */ /** Wait time window (in msec) if BlockMissingException is caught. */
private final int timeWindow; private final int timeWindow;
@ -171,6 +174,8 @@ public class DfsClientConf {
defaultChecksumOpt = getChecksumOptFromConf(conf); defaultChecksumOpt = getChecksumOptFromConf(conf);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT); HdfsConstants.READ_TIMEOUT);
socketSendBufferSize = conf.getInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY,
DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
/** dfs.write.packet.size is an internal config variable */ /** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt( writePacketSize = conf.getInt(
DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
@ -409,6 +414,13 @@ public class DfsClientConf {
return socketTimeout; return socketTimeout;
} }
/**
* @return the socketSendBufferSize
*/
public int getSocketSendBufferSize() {
return socketSendBufferSize;
}
/** /**
* @return the excludedNodesCacheExpiry * @return the excludedNodesCacheExpiry
*/ */

View File

@ -748,6 +748,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9292. Make TestFileConcorruption independent to underlying FsDataset HDFS-9292. Make TestFileConcorruption independent to underlying FsDataset
Implementation. (lei) Implementation. (lei)
HDFS-9259. Make SO_SNDBUF size configurable at DFSClient side for hdfs
write scenario. (Mingliang Liu via mingma)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -2153,6 +2153,18 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.socket.send.buffer.size</name>
<value>131072</value>
<description>
Socket send buffer size for a write pipeline in DFSClient side.
This may affect TCP connection throughput.
If it is set to zero or negative value,
no buffer size will be set explicitly,
thus enable tcp auto-tuning on some system.
</description>
</property>
<property> <property>
<name>dfs.domain.socket.path</name> <name>dfs.domain.socket.path</name>
<value></value> <value></value>

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.Socket;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestDFSClientSocketSize {
private static final Logger LOG = LoggerFactory.getLogger(
TestDFSClientSocketSize.class);
static {
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
}
private final Configuration conf = new Configuration();
private MiniDFSCluster cluster;
private Socket socket;
@Test
public void testDefaultSendBufferSize() throws IOException {
socket = createSocket();
assertEquals("Send buffer size should be the default value.",
DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT,
socket.getSendBufferSize());
}
@Test
public void testSpecifiedSendBufferSize() throws IOException {
final int mySendBufferSize = 64 * 1024; // 64 KB
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, mySendBufferSize);
socket = createSocket();
assertEquals("Send buffer size should be the customized value.",
mySendBufferSize, socket.getSendBufferSize());
}
@Test
public void testAutoTuningSendBufferSize() throws IOException {
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 0);
socket = createSocket();
LOG.info("The auto tuned send buffer size is: {}",
socket.getSendBufferSize());
assertTrue("Send buffer size should be non-negative value which is " +
"determined by system (kernel).", socket.getSendBufferSize() > 0);
}
@After
public void tearDown() throws Exception {
if (socket != null) {
LOG.info("Closing the DFSClient socket.");
}
if (cluster != null) {
LOG.info("Shutting down MiniDFSCluster.");
cluster.shutdown();
}
}
private Socket createSocket() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
LOG.info("MiniDFSCluster started.");
return DataStreamer.createSocketForPipeline(
new DatanodeInfo(cluster.dataNodes.get(0).datanode.getDatanodeId()),
1, cluster.getFileSystem().getClient());
}
}