diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 5bb783713d3..9b140beb360 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -139,7 +139,9 @@ class DataStreamer extends Daemon { NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout()); sock.setSoTimeout(timeout); - sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + if (conf.getSocketSendBufferSize() > 0) { + sock.setSendBufferSize(conf.getSocketSendBufferSize()); + } LOG.debug("Send buf size {}", sock.getSendBufferSize()); return sock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 2e72769c12a..992cf3a4a42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.client; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import java.util.concurrent.TimeUnit; @@ -62,6 +63,10 @@ public interface HdfsClientConfigKeys { String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout"; + String DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY = + "dfs.client.socket.send.buffer.size"; + int DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT = + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE; String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 15387bb1cb0..7f3ae048876 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -55,6 +55,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCK import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; @@ -104,6 +106,7 @@ public class DfsClientConf { private final int writeMaxPackets; private final ByteArrayManager.Conf writeByteArrayManagerConf; private final int socketTimeout; + private final int socketSendBufferSize; private final long excludedNodesCacheExpiry; /** Wait time window (in msec) if BlockMissingException is caught. */ private final int timeWindow; @@ -171,6 +174,8 @@ public class DfsClientConf { defaultChecksumOpt = getChecksumOptFromConf(conf); socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT); + socketSendBufferSize = conf.getInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, + DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT); /** dfs.write.packet.size is an internal config variable */ writePacketSize = conf.getInt( DFS_CLIENT_WRITE_PACKET_SIZE_KEY, @@ -409,6 +414,13 @@ public class DfsClientConf { return socketTimeout; } + /** + * @return the socketSendBufferSize + */ + public int getSocketSendBufferSize() { + return socketSendBufferSize; + } + /** * @return the excludedNodesCacheExpiry */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 104e3c52314..d19ba632181 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -748,6 +748,9 @@ Release 2.8.0 - UNRELEASED HDFS-9292. Make TestFileConcorruption independent to underlying FsDataset Implementation. (lei) + HDFS-9259. Make SO_SNDBUF size configurable at DFSClient side for hdfs + write scenario. (Mingliang Liu via mingma) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d81acc149be..1389bc92bfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2153,6 +2153,18 @@ + + dfs.client.socket.send.buffer.size + 131072 + + Socket send buffer size for a write pipeline in DFSClient side. + This may affect TCP connection throughput. + If it is set to zero or negative value, + no buffer size will be set explicitly, + thus enable tcp auto-tuning on some system. + + + dfs.domain.socket.path diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientSocketSize.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientSocketSize.java new file mode 100644 index 00000000000..aaeaa0181b9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientSocketSize.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; + +import org.junit.After; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.Socket; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestDFSClientSocketSize { + private static final Logger LOG = LoggerFactory.getLogger( + TestDFSClientSocketSize.class); + static { + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); + } + + private final Configuration conf = new Configuration(); + private MiniDFSCluster cluster; + private Socket socket; + + @Test + public void testDefaultSendBufferSize() throws IOException { + socket = createSocket(); + assertEquals("Send buffer size should be the default value.", + DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT, + socket.getSendBufferSize()); + } + + @Test + public void testSpecifiedSendBufferSize() throws IOException { + final int mySendBufferSize = 64 * 1024; // 64 KB + conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, mySendBufferSize); + socket = createSocket(); + assertEquals("Send buffer size should be the customized value.", + mySendBufferSize, socket.getSendBufferSize()); + } + + @Test + public void testAutoTuningSendBufferSize() throws IOException { + conf.setInt(DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY, 0); + socket = createSocket(); + LOG.info("The auto tuned send buffer size is: {}", + socket.getSendBufferSize()); + assertTrue("Send buffer size should be non-negative value which is " + + "determined by system (kernel).", socket.getSendBufferSize() > 0); + } + + @After + public void tearDown() throws Exception { + if (socket != null) { + LOG.info("Closing the DFSClient socket."); + } + if (cluster != null) { + LOG.info("Shutting down MiniDFSCluster."); + cluster.shutdown(); + } + } + + private Socket createSocket() throws IOException { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + LOG.info("MiniDFSCluster started."); + return DataStreamer.createSocketForPipeline( + new DatanodeInfo(cluster.dataNodes.get(0).datanode.getDatanodeId()), + 1, cluster.getFileSystem().getClient()); + } +}