HDFS-14844. Make buffer of BlockReaderRemote#newBlockReader#BufferedOutputStream configurable. Contributed by Lisheng Sun.
This commit is contained in:
parent
b3173e1f58
commit
3f223bebfa
|
@ -148,6 +148,9 @@ public interface HdfsClientConfigKeys {
|
||||||
"dfs.client.key.provider.cache.expiry";
|
"dfs.client.key.provider.cache.expiry";
|
||||||
long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
|
long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
|
||||||
TimeUnit.DAYS.toMillis(10); // 10 days
|
TimeUnit.DAYS.toMillis(10); // 10 days
|
||||||
|
String DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_KEY =
|
||||||
|
"dfs.client.block.reader.remote.buffer.size";
|
||||||
|
int DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_DEFAULT = 8192;
|
||||||
|
|
||||||
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
|
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
|
||||||
"dfs.datanode.kerberos.principal";
|
"dfs.datanode.kerberos.principal";
|
||||||
|
|
|
@ -855,7 +855,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
fileName, block, token, startOffset, length,
|
fileName, block, token, startOffset, length,
|
||||||
verifyChecksum, clientName, peer, datanode,
|
verifyChecksum, clientName, peer, datanode,
|
||||||
clientContext.getPeerCache(), cachingStrategy,
|
clientContext.getPeerCache(), cachingStrategy,
|
||||||
networkDistance);
|
networkDistance, configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.EnumSet;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.PeerCache;
|
import org.apache.hadoop.hdfs.PeerCache;
|
||||||
|
@ -55,6 +56,9 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a wrapper around connection to datanode
|
* This is a wrapper around connection to datanode
|
||||||
* and understands checksum, offset etc.
|
* and understands checksum, offset etc.
|
||||||
|
@ -391,10 +395,13 @@ public class BlockReaderRemote implements BlockReader {
|
||||||
Peer peer, DatanodeID datanodeID,
|
Peer peer, DatanodeID datanodeID,
|
||||||
PeerCache peerCache,
|
PeerCache peerCache,
|
||||||
CachingStrategy cachingStrategy,
|
CachingStrategy cachingStrategy,
|
||||||
int networkDistance) throws IOException {
|
int networkDistance, Configuration configuration) throws IOException {
|
||||||
// in and out will be closed when sock is closed (by the caller)
|
// in and out will be closed when sock is closed (by the caller)
|
||||||
|
int bufferSize = configuration.getInt(
|
||||||
|
DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_KEY,
|
||||||
|
DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_DEFAULT);
|
||||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||||
peer.getOutputStream()));
|
peer.getOutputStream(), bufferSize));
|
||||||
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
|
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
|
||||||
verifyChecksum, cachingStrategy);
|
verifyChecksum, cachingStrategy);
|
||||||
|
|
||||||
|
|
|
@ -129,7 +129,7 @@ class StripedBlockReader {
|
||||||
return BlockReaderRemote.newBlockReader(
|
return BlockReaderRemote.newBlockReader(
|
||||||
"dummy", block, blockToken, offsetInBlock,
|
"dummy", block, blockToken, offsetInBlock,
|
||||||
block.getNumBytes() - offsetInBlock, true, "", peer, source,
|
block.getNumBytes() - offsetInBlock, true, "", peer, source,
|
||||||
null, stripedReader.getCachingStrategy(), -1);
|
null, stripedReader.getCachingStrategy(), -1, conf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info("Exception while creating remote block reader, datanode {}",
|
LOG.info("Exception while creating remote block reader, datanode {}",
|
||||||
source, e);
|
source, e);
|
||||||
|
|
|
@ -4098,6 +4098,18 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.client.block.reader.remote.buffer.size</name>
|
||||||
|
<value>8192</value>
|
||||||
|
<description>
|
||||||
|
The output stream buffer size of a DFSClient remote read. The buffer default value is 8KB. The buffer includes
|
||||||
|
only some request parameters that are: block, blockToken, clientName, startOffset, len, verifyChecksum,
|
||||||
|
cachingStrategy.
|
||||||
|
It is recommended to adjust the value according to the workload, which can reduce unnecessary memory
|
||||||
|
usage and the frequency of the garbage collection. A value of 512 might be reasonable.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.content-summary.limit</name>
|
<name>dfs.content-summary.limit</name>
|
||||||
<value>5000</value>
|
<value>5000</value>
|
||||||
|
|
Loading…
Reference in New Issue