diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1b21c4ddcd4..270f30b2253 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -928,6 +928,10 @@ Release 2.8.0 - UNRELEASED HDFS-8929. Add a metric to expose the timestamp of the last journal (surendra singh lilhore via vinayakumarb) + HDFS-8829. Make SO_RCVBUF and SO_SNDBUF size configurable for + DataTransferProtocol sockets and allow configuring auto-tuning (He Tianyi + via Colin P. McCabe) + BUG FIXES HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 62abc35dfab..0498450e21a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.http.HttpConfig; @@ -769,9 +770,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT = false; + public static final String + DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY = + "dfs.datanode.transfer.socket.send.buffer.size"; + public static final int + DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT = + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE; + + public static final String + DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY = + "dfs.datanode.transfer.socket.recv.buffer.size"; + public static final int + DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT = + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE; - - // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java index 95a138894d9..5425bd5af5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java @@ -49,6 +49,11 @@ public void setReceiveBufferSize(int size) throws IOException { sock.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, size); } + @Override + public int getReceiveBufferSize() throws IOException { + return sock.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); + } + @Override public Peer accept() throws IOException, SocketTimeoutException { DomainSocket connSock = sock.accept(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java index c7b6b14df49..72974e2cdac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java @@ -32,7 +32,14 @@ public interface PeerServer extends Closeable { public void setReceiveBufferSize(int size) throws IOException; /** - * Listens for a connection to be made to this server and accepts + * Get the receive buffer size of the PeerServer. + * + * @return The receive buffer size. + */ + int getReceiveBufferSize() throws IOException; + + /** + * Listens for a connection to be made to this server and accepts * it. The method blocks until a connection is made. * * @exception IOException if an I/O error occurs when waiting for a diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java index e31e46a42c9..8858de80fae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java @@ -73,6 +73,11 @@ public void setReceiveBufferSize(int size) throws IOException { this.serverSocket.setReceiveBufferSize(size); } + @Override + public int getReceiveBufferSize() throws IOException { + return this.serverSocket.getReceiveBufferSize(); + } + @Override public Peer accept() throws IOException, SocketTimeoutException { Peer peer = DFSUtilClient.peerFromSocket(serverSocket.accept()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 9c25f5eb350..bd4943d026f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -71,7 +71,9 @@ public class DNConf { final int socketTimeout; final int socketWriteTimeout; final int socketKeepaliveTimeout; - + private final int transferSocketSendBufferSize; + private final int transferSocketRecvBufferSize; + final boolean transferToAllowed; final boolean dropCacheBehindWrites; final boolean syncBehindWrites; @@ -114,8 +116,14 @@ public DNConf(Configuration conf) { socketKeepaliveTimeout = conf.getInt( DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); - - /* Based on results on different platforms, we might need set the default + this.transferSocketSendBufferSize = conf.getInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT); + this.transferSocketRecvBufferSize = conf.getInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT); + + /* Based on results on different platforms, we might need set the default * to false on some of them. */ transferToAllowed = conf.getBoolean( DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, @@ -279,4 +287,12 @@ public boolean getIgnoreSecurePortsForTesting() { public boolean getAllowNonLocalLazyPersist() { return allowNonLocalLazyPersist; } + + public int getTransferSocketRecvBufferSize() { + return transferSocketRecvBufferSize; + } + + public int getTransferSocketSendBufferSize() { + return transferSocketSendBufferSize; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0b0a0e8db4b..d51d0a57088 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -910,7 +910,10 @@ private void initDataXceiver(Configuration conf) throws IOException { tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, DataNode.getStreamingAddr(conf)); } - tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + if (dnConf.getTransferSocketRecvBufferSize() > 0) { + tcpPeerServer.setReceiveBufferSize( + dnConf.getTransferSocketRecvBufferSize()); + } streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); @@ -958,8 +961,12 @@ private static DomainPeerServer getDomainPeerServer(Configuration conf, } DomainPeerServer domainPeerServer = new DomainPeerServer(domainSocketPath, port); - domainPeerServer.setReceiveBufferSize( - HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + int recvBufferSize = conf.getInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT); + if (recvBufferSize > 0) { + domainPeerServer.setReceiveBufferSize(recvBufferSize); + } return domainPeerServer; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index efd2217dd68..4f6dc9652ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -709,8 +709,11 @@ public void writeBlock(final ExtendedBlock block, (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); - mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - + if (dnConf.getTransferSocketSendBufferSize() > 0) { + mirrorSock.setSendBufferSize( + dnConf.getTransferSocketSendBufferSize()); + } + OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, writeTimeout); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index caf6eaaa920..8d312a89520 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -278,7 +278,12 @@ synchronized int getNumPeers() { synchronized int getNumPeersXceiver() { return peersXceiver.size(); } - + + @VisibleForTesting + PeerServer getPeerServer() { + return peerServer; + } + synchronized void releasePeer(Peer peer) { peers.remove(peer); peersXceiver.remove(peer); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 62665fc1e9a..e9b62c712ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2424,4 +2424,26 @@ + + dfs.datanode.transfer.socket.send.buffer.size + 131072 + + Socket send buffer size for DataXceiver (mirroring packets to downstream + in pipeline). This may affect TCP connection throughput. + If it is set to zero or negative value, no buffer size will be set + explicitly, thus enable tcp auto-tuning on some system. + + + + + dfs.datanode.transfer.socket.recv.buffer.size + 131072 + + Socket receive buffer size for DataXceiver (receiving packets from client + during block writing). This may affect TCP connection throughput. + If it is set to zero or negative value, no buffer size will be set + explicitly, thus enable tcp auto-tuning on some system. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTransferSocketSize.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTransferSocketSize.java new file mode 100644 index 00000000000..0e98b86a454 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTransferSocketSize.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.Test; + +public class TestDataNodeTransferSocketSize { + + @Test + public void testSpecifiedDataSocketSize() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, 4 * 1024); + SimulatedFSDataset.setFactory(conf); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + List datanodes = cluster.getDataNodes(); + DataNode datanode = datanodes.get(0); + assertEquals("Receive buffer size should be 4K", + 4 * 1024, datanode.getXferServer().getPeerServer().getReceiveBufferSize()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testAutoTuningDataSocketSize() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, 0); + SimulatedFSDataset.setFactory(conf); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + List datanodes = cluster.getDataNodes(); + DataNode datanode = datanodes.get(0); + assertTrue( + "Receive buffer size should be a default value (determined by kernel)", + datanode.getXferServer().getPeerServer().getReceiveBufferSize() > 0); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } +}