HDFS-12823. Backport HDFS-9259 "Make SO_SNDBUF size configurable at DFSClient" to branch-2.7. (Erik Krogen via zhz)

This commit is contained in:
Zhe Zhang 2017-11-17 16:44:21 -08:00
parent 6f876f419d
commit 0da13b90f7
6 changed files with 128 additions and 1 deletions

View File

@ -20,6 +20,10 @@ Release 2.7.5 - UNRELEASED
HDFS-10984. Expose nntop output as metrics. (Siddharth Wagle via xyao, zhz)
HDFS-9259. Make SO_SNDBUF size configurable at DFSClient side for hdfs write
scenario. (original patch Mingliang Liu via Ming Ma, branch-2.7 backport done
under HDFS-12823, Erik Krogen via zhz).
OPTIMIZATIONS
HDFS-10711. Optimize FSPermissionChecker group membership check.

View File

@ -51,6 +51,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@ -299,6 +301,7 @@ public static class Conf {
final int writeMaxPackets;
final ByteArrayManager.Conf writeByteArrayManagerConf;
final int socketTimeout;
private final int socketSendBufferSize;
final int socketCacheCapacity;
final long socketCacheExpiry;
final long excludedNodesCacheExpiry;
@ -369,6 +372,8 @@ public Conf(Configuration conf) {
defaultChecksumOpt = getChecksumOptFromConf(conf);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
socketSendBufferSize = conf.getInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY,
DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
/** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
@ -511,6 +516,10 @@ public Conf(Configuration conf) {
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
}
public int getSocketSendBufferSize() {
return socketSendBufferSize;
}
public boolean isUseLegacyBlockReaderLocal() {
return useLegacyBlockReaderLocal;
}

View File

@ -426,6 +426,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
public static final String DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY =
"dfs.client.socket.send.buffer.size";
public static final int DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT =
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = "dfs.namenode.checkpoint.dir";
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
public static final String DFS_HOSTS = "dfs.hosts";

View File

@ -1701,7 +1701,9 @@ static Socket createSocketForPipeline(final DatanodeInfo first,
final int timeout = client.getDatanodeReadTimeout(length);
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout);
sock.setSoTimeout(timeout);
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
if (client.getConf().getSocketSendBufferSize() > 0) {
sock.setSendBufferSize(client.getConf().getSocketSendBufferSize());
}
if(DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize());
}

View File

@ -2215,6 +2215,18 @@
</description>
</property>
<property>
<name>dfs.client.socket.send.buffer.size</name>
<value>131072</value>
<description>
Socket send buffer size for a write pipeline in DFSClient side.
This may affect TCP connection throughput.
If it is set to zero or negative value,
no buffer size will be set explicitly,
thus enable tcp auto-tuning on some system.
</description>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value></value>

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.Socket;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestDFSClientSocketSize {
private static final Logger LOG = LoggerFactory.getLogger(
TestDFSClientSocketSize.class);
static {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
}
private final Configuration conf = new Configuration();
private MiniDFSCluster cluster;
private Socket socket;
@Test
public void testDefaultSendBufferSize() throws IOException {
socket = createSocket();
assertEquals("Send buffer size should be the default value.",
DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT,
socket.getSendBufferSize());
}
@Test
public void testSpecifiedSendBufferSize() throws IOException {
final int mySendBufferSize = 64 * 1024; // 64 KB
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, mySendBufferSize);
socket = createSocket();
assertEquals("Send buffer size should be the customized value.",
mySendBufferSize, socket.getSendBufferSize());
}
@Test
public void testAutoTuningSendBufferSize() throws IOException {
conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 0);
socket = createSocket();
LOG.info("The auto tuned send buffer size is: {}",
socket.getSendBufferSize());
assertTrue("Send buffer size should be non-negative value which is " +
"determined by system (kernel).", socket.getSendBufferSize() > 0);
}
@After
public void tearDown() throws Exception {
if (socket != null) {
LOG.info("Closing the DFSClient socket.");
}
if (cluster != null) {
LOG.info("Shutting down MiniDFSCluster.");
cluster.shutdown();
}
}
private Socket createSocket() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
LOG.info("MiniDFSCluster started.");
return DFSOutputStream.createSocketForPipeline(
new DatanodeInfo(cluster.dataNodes.get(0).datanode.getDatanodeId()),
1, cluster.getFileSystem().getClient());
}
}