HDFS-9669. TcpPeerServer should respect ipc.server.listen.queue.size (Elliot Clark via cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2016-02-02 10:59:10 -08:00
parent 9d494f0c0e
commit 2da03b48eb
4 changed files with 18 additions and 4 deletions

View File

@ -2699,6 +2699,9 @@ Release 2.7.3 - UNRELEASED
HDFS-9654. Code refactoring for HDFS-8578. (szetszwo) HDFS-9654. Code refactoring for HDFS-8578. (szetszwo)
HDFS-9669. TcpPeerServer should respect ipc.server.listen.queue.size
(Elliot Clark via cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -41,13 +41,15 @@ public class TcpPeerServer implements PeerServer {
* *
* @param socketWriteTimeout The Socket write timeout in ms. * @param socketWriteTimeout The Socket write timeout in ms.
* @param bindAddr The address to bind to. * @param bindAddr The address to bind to.
* @param backlogLength The length of the tcp accept backlog
* @throws IOException * @throws IOException
*/ */
public TcpPeerServer(int socketWriteTimeout, public TcpPeerServer(int socketWriteTimeout,
InetSocketAddress bindAddr) throws IOException { InetSocketAddress bindAddr,
int backlogLength) throws IOException {
this.serverSocket = (socketWriteTimeout > 0) ? this.serverSocket = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket(); ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(serverSocket, bindAddr, 0); Server.bind(serverSocket, bindAddr, backlogLength);
} }
/** /**

View File

@ -49,6 +49,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
@ -1016,8 +1018,11 @@ public class DataNode extends ReconfigurableBase
if (secureResources != null) { if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources); tcpPeerServer = new TcpPeerServer(secureResources);
} else { } else {
int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf)); DataNode.getStreamingAddr(conf), backlogLength);
} }
if (dnConf.getTransferSocketRecvBufferSize() > 0) { if (dnConf.getTransferSocketRecvBufferSize() > 0) {
tcpPeerServer.setReceiveBufferSize( tcpPeerServer.setReceiveBufferSize(

View File

@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.daemon.Daemon; import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext; import org.apache.commons.daemon.DaemonContext;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -95,10 +96,13 @@ public class SecureDataNodeStarter implements Daemon {
int socketWriteTimeout = conf.getInt( int socketWriteTimeout = conf.getInt(
DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsConstants.WRITE_TIMEOUT); HdfsConstants.WRITE_TIMEOUT);
int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
ServerSocket ss = (socketWriteTimeout > 0) ? ServerSocket ss = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket(); ServerSocketChannel.open().socket() : new ServerSocket();
ss.bind(streamingAddr, 0); ss.bind(streamingAddr, backlogLength);
// Check that we got the port we need // Check that we got the port we need
if (ss.getLocalPort() != streamingAddr.getPort()) { if (ss.getLocalPort() != streamingAddr.getPort()) {