HDFS-15650. Make the socket timeout for computing checksum of striped blocks configurable (#2414)
This commit is contained in:
parent
632f64cadb
commit
4bb25c810b
@ -623,7 +623,8 @@ static class StripedFileNonStripedChecksumComputer
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void checksumBlocks() throws IOException {
|
void checksumBlocks() throws IOException {
|
||||||
int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout();
|
int tmpTimeout = getClient().getConf().getChecksumEcSocketTimeout() * 1 +
|
||||||
|
getClient().getConf().getSocketTimeout();
|
||||||
setTimeout(tmpTimeout);
|
setTimeout(tmpTimeout);
|
||||||
|
|
||||||
for (bgIdx = 0;
|
for (bgIdx = 0;
|
||||||
|
@ -133,6 +133,8 @@ public interface HdfsClientConfigKeys {
|
|||||||
int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
|
int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
|
||||||
String DFS_CHECKSUM_COMBINE_MODE_KEY = "dfs.checksum.combine.mode";
|
String DFS_CHECKSUM_COMBINE_MODE_KEY = "dfs.checksum.combine.mode";
|
||||||
String DFS_CHECKSUM_COMBINE_MODE_DEFAULT = "MD5MD5CRC";
|
String DFS_CHECKSUM_COMBINE_MODE_DEFAULT = "MD5MD5CRC";
|
||||||
|
String DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY = "dfs.checksum.ec.socket-timeout";
|
||||||
|
int DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT = 3000;
|
||||||
String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
|
String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
|
||||||
"dfs.datanode.socket.write.timeout";
|
"dfs.datanode.socket.write.timeout";
|
||||||
String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC =
|
String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC =
|
||||||
|
@ -46,6 +46,8 @@
|
|||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_DEFAULT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
||||||
@ -115,6 +117,7 @@ public class DfsClientConf {
|
|||||||
private final int ioBufferSize;
|
private final int ioBufferSize;
|
||||||
private final ChecksumOpt defaultChecksumOpt;
|
private final ChecksumOpt defaultChecksumOpt;
|
||||||
private final ChecksumCombineMode checksumCombineMode;
|
private final ChecksumCombineMode checksumCombineMode;
|
||||||
|
private final int checksumEcSocketTimeout;
|
||||||
private final int writePacketSize;
|
private final int writePacketSize;
|
||||||
private final int writeMaxPackets;
|
private final int writeMaxPackets;
|
||||||
private final ByteArrayManager.Conf writeByteArrayManagerConf;
|
private final ByteArrayManager.Conf writeByteArrayManagerConf;
|
||||||
@ -198,6 +201,8 @@ public DfsClientConf(Configuration conf) {
|
|||||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||||
defaultChecksumOpt = getChecksumOptFromConf(conf);
|
defaultChecksumOpt = getChecksumOptFromConf(conf);
|
||||||
checksumCombineMode = getChecksumCombineModeFromConf(conf);
|
checksumCombineMode = getChecksumCombineModeFromConf(conf);
|
||||||
|
checksumEcSocketTimeout = conf.getInt(DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY,
|
||||||
|
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT);
|
||||||
dataTransferTcpNoDelay = conf.getBoolean(
|
dataTransferTcpNoDelay = conf.getBoolean(
|
||||||
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
|
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
|
||||||
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
|
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
|
||||||
@ -478,6 +483,13 @@ public ChecksumCombineMode getChecksumCombineMode() {
|
|||||||
return checksumCombineMode;
|
return checksumCombineMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the checksumEcSocketTimeout
|
||||||
|
*/
|
||||||
|
public int getChecksumEcSocketTimeout() {
|
||||||
|
return checksumEcSocketTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the writePacketSize
|
* @return the writePacketSize
|
||||||
*/
|
*/
|
||||||
|
@ -597,7 +597,7 @@ private ExtendedBlock getInternalBlock(int numDataUnits, int idx) {
|
|||||||
private void checksumBlock(ExtendedBlock block, int blockIdx,
|
private void checksumBlock(ExtendedBlock block, int blockIdx,
|
||||||
Token<BlockTokenIdentifier> blockToken,
|
Token<BlockTokenIdentifier> blockToken,
|
||||||
DatanodeInfo targetDatanode) throws IOException {
|
DatanodeInfo targetDatanode) throws IOException {
|
||||||
int timeout = 3000;
|
int timeout = getDatanode().getDnConf().getEcChecksumSocketTimeout();
|
||||||
try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode,
|
try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode,
|
||||||
timeout, block, blockToken)) {
|
timeout, block, blockToken)) {
|
||||||
|
|
||||||
|
@ -62,6 +62,8 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -84,6 +86,7 @@ public class DNConf {
|
|||||||
final int socketTimeout;
|
final int socketTimeout;
|
||||||
final int socketWriteTimeout;
|
final int socketWriteTimeout;
|
||||||
final int socketKeepaliveTimeout;
|
final int socketKeepaliveTimeout;
|
||||||
|
final int ecChecksumSocketTimeout;
|
||||||
private final int transferSocketSendBufferSize;
|
private final int transferSocketSendBufferSize;
|
||||||
private final int transferSocketRecvBufferSize;
|
private final int transferSocketRecvBufferSize;
|
||||||
private final boolean tcpNoDelay;
|
private final boolean tcpNoDelay;
|
||||||
@ -145,6 +148,9 @@ public DNConf(final Configurable dn) {
|
|||||||
socketKeepaliveTimeout = getConf().getInt(
|
socketKeepaliveTimeout = getConf().getInt(
|
||||||
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
|
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
|
||||||
|
ecChecksumSocketTimeout = getConf().getInt(
|
||||||
|
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY,
|
||||||
|
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT);
|
||||||
this.transferSocketSendBufferSize = getConf().getInt(
|
this.transferSocketSendBufferSize = getConf().getInt(
|
||||||
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY,
|
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
|
||||||
@ -372,6 +378,15 @@ public int getSocketWriteTimeout() {
|
|||||||
return socketWriteTimeout;
|
return socketWriteTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns socket timeout for computing the checksum of EC blocks
|
||||||
|
*
|
||||||
|
* @return int socket timeout
|
||||||
|
*/
|
||||||
|
public int getEcChecksumSocketTimeout() {
|
||||||
|
return ecChecksumSocketTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the SaslPropertiesResolver configured for use with
|
* Returns the SaslPropertiesResolver configured for use with
|
||||||
* DataTransferProtocol, or null if not configured.
|
* DataTransferProtocol, or null if not configured.
|
||||||
|
@ -4220,6 +4220,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.checksum.ec.socket-timeout</name>
|
||||||
|
<value>3000</value>
|
||||||
|
<description>
|
||||||
|
Default timeout value in milliseconds for computing the checksum of striped blocks.
|
||||||
|
Recommended to set the same value between client and DNs in a cluster because mismatching
|
||||||
|
may cause exhausting handler threads.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.client.block.write.locateFollowingBlock.retries</name>
|
<name>dfs.client.block.write.locateFollowingBlock.retries</name>
|
||||||
<value>5</value>
|
<value>5</value>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user