HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and SMALL_BUFFER_SIZE to the users. Contributed by Li Lu.

This commit is contained in:
Haohui Mai 2015-05-05 15:41:22 -07:00
parent 9809a16d3c
commit 4da8490b51
16 changed files with 95 additions and 54 deletions

View File

@ -513,6 +513,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote
NameNode (Charles Lamb via Colin P. McCabe) NameNode (Charles Lamb via Colin P. McCabe)
HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and
SMALL_BUFFER_SIZE to the users. (Li Lu via wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -238,6 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
new DFSHedgedReadMetrics(); new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
private final Sampler<?> traceSampler; private final Sampler<?> traceSampler;
private final int smallBufferSize;
public DfsClientConf getConf() { public DfsClientConf getConf() {
return dfsClientConf; return dfsClientConf;
@ -309,6 +310,7 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
this.stats = stats; this.stats = stats;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
@ -1902,7 +1904,7 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
//connect to a datanode //connect to a datanode
IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
out = new DataOutputStream(new BufferedOutputStream(pair.out, out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
in = new DataInputStream(pair.in); in = new DataInputStream(pair.in);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -2067,7 +2069,7 @@ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
try { try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
DataInputStream in = new DataInputStream(pair.in); DataInputStream in = new DataInputStream(pair.in);
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,

View File

@ -70,6 +70,7 @@
import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -1514,4 +1515,14 @@ public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
.createKeyProviderCryptoExtension(keyProvider); .createKeyProviderCryptoExtension(keyProvider);
return cryptoProvider; return cryptoProvider;
} }
public static int getIoFileBufferSize(Configuration conf) {
return conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
}
public static int getSmallBufferSize(Configuration conf) {
return Math.min(getIoFileBufferSize(conf) / 2, 512);
}
} }

View File

@ -71,7 +71,6 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.hdfs.util.ByteArrayManager;
@ -92,7 +91,6 @@
import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
@ -123,6 +121,7 @@
@InterfaceAudience.Private @InterfaceAudience.Private
class DataStreamer extends Daemon { class DataStreamer extends Daemon {
static final Log LOG = LogFactory.getLog(DataStreamer.class); static final Log LOG = LogFactory.getLog(DataStreamer.class);
/** /**
* Create a socket for a write pipeline * Create a socket for a write pipeline
* *
@ -1145,7 +1144,7 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
in = new DataInputStream(unbufIn); in = new DataInputStream(unbufIn);
//send the TRANSFER_BLOCK request //send the TRANSFER_BLOCK request
@ -1425,7 +1424,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
blockReplyStream = new DataInputStream(unbufIn); blockReplyStream = new DataInputStream(unbufIn);
// //

View File

@ -118,6 +118,8 @@ public class Dispatcher {
/** The maximum number of concurrent blocks moves at a datanode */ /** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode; private final int maxConcurrentMovesPerNode;
private final int ioFileBufferSize;
private static class GlobalBlockMap { private static class GlobalBlockMap {
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>(); private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
@ -308,9 +310,9 @@ private void dispatch() {
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.IO_FILE_BUFFER_SIZE)); ioFileBufferSize));
in = new DataInputStream(new BufferedInputStream(unbufIn, in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsServerConstants.IO_FILE_BUFFER_SIZE)); ioFileBufferSize));
sendRequest(out, eb, accessToken); sendRequest(out, eb, accessToken);
receiveResponse(in); receiveResponse(in);
@ -801,6 +803,7 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
this.saslClient = new SaslDataTransferClient(conf, this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf), DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
} }
public DistributedFileSystem getDistributedFileSystem() { public DistributedFileSystem getDistributedFileSystem() {

View File

@ -24,9 +24,7 @@
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion; import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@ -56,12 +54,6 @@ public interface HdfsServerConstants {
// to 1k. // to 1k.
int MAX_PATH_LENGTH = 8000; int MAX_PATH_LENGTH = 8000;
int MAX_PATH_DEPTH = 1000; int MAX_PATH_DEPTH = 1000;
int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
// Used for writing header etc.
int SMALL_BUFFER_SIZE = Math.min(IO_FILE_BUFFER_SIZE / 2,
512);
// An invalid transaction ID that will never be seen in a real namesystem. // An invalid transaction ID that will never be seen in a real namesystem.
long INVALID_TXID = -12345; long INVALID_TXID = -12345;
// Number of generation stamps reserved for legacy blocks. // Number of generation stamps reserved for legacy blocks.

View File

@ -33,7 +33,8 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -61,6 +62,8 @@ public class BlockMetadataHeader {
private final short version; private final short version;
private DataChecksum checksum = null; private DataChecksum checksum = null;
private static final HdfsConfiguration conf = new HdfsConfiguration();
@VisibleForTesting @VisibleForTesting
public BlockMetadataHeader(short version, DataChecksum checksum) { public BlockMetadataHeader(short version, DataChecksum checksum) {
this.checksum = checksum; this.checksum = checksum;
@ -85,7 +88,7 @@ public static DataChecksum readDataChecksum(File metaFile) throws IOException {
DataInputStream in = null; DataInputStream in = null;
try { try {
in = new DataInputStream(new BufferedInputStream( in = new DataInputStream(new BufferedInputStream(
new FileInputStream(metaFile), HdfsServerConstants.IO_FILE_BUFFER_SIZE)); new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
return readDataChecksum(in, metaFile); return readDataChecksum(in, metaFile);
} finally { } finally {
IOUtils.closeStream(in); IOUtils.closeStream(in);

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
@ -47,7 +48,6 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -246,7 +246,8 @@ class BlockReceiver implements Closeable {
out.getClass()); out.getClass());
} }
this.checksumOut = new DataOutputStream(new BufferedOutputStream( this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.getChecksumOut(), HdfsServerConstants.SMALL_BUFFER_SIZE)); streams.getChecksumOut(), DFSUtil.getSmallBufferSize(
datanode.getConf())));
// write data chunk header if creating a new replica // write data chunk header if creating a new replica
if (isCreate) { if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);

View File

@ -34,9 +34,10 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@ -104,8 +105,13 @@ class BlockSender implements java.io.Closeable {
* not sure if there will be much more improvement. * not sure if there will be much more improvement.
*/ */
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024; private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
private static final int IO_FILE_BUFFER_SIZE;
static {
HdfsConfiguration conf = new HdfsConfiguration();
IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
}
private static final int TRANSFERTO_BUFFER_SIZE = Math.max( private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
HdfsServerConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO); IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
/** the block to read from */ /** the block to read from */
private final ExtendedBlock block; private final ExtendedBlock block;
@ -298,7 +304,7 @@ class BlockSender implements java.io.Closeable {
// storage and computes the checksum. // storage and computes the checksum.
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) { if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
checksumIn = new DataInputStream(new BufferedInputStream( checksumIn = new DataInputStream(new BufferedInputStream(
metaIn, HdfsServerConstants.IO_FILE_BUFFER_SIZE)); metaIn, IO_FILE_BUFFER_SIZE));
csum = BlockMetadataHeader.readDataChecksum(checksumIn, block); csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
keepMetaInOpen = true; keepMetaInOpen = true;
@ -747,7 +753,7 @@ private long doSendBlock(DataOutputStream out, OutputStream baseStream,
pktBufSize += checksumSize * maxChunksPerPacket; pktBufSize += checksumSize * maxChunksPerPacket;
} else { } else {
maxChunksPerPacket = Math.max(1, maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsServerConstants.IO_FILE_BUFFER_SIZE)); numberOfChunks(IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data // Packet size includes both checksum and data
pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket; pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
} }

View File

@ -2156,7 +2156,7 @@ public void run() {
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); DFSUtil.getSmallBufferSize(conf)));
in = new DataInputStream(unbufIn); in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(), blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, true, DataNode.this, null, cachingStrategy); false, false, true, DataNode.this, null, cachingStrategy);

View File

@ -48,7 +48,9 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -109,6 +111,8 @@ class DataXceiver extends Receiver implements Runnable {
private final InputStream socketIn; private final InputStream socketIn;
private OutputStream socketOut; private OutputStream socketOut;
private BlockReceiver blockReceiver = null; private BlockReceiver blockReceiver = null;
private final int ioFileBufferSize;
private final int smallBufferSize;
/** /**
* Client Name used in previous operation. Not available on first request * Client Name used in previous operation. Not available on first request
@ -131,6 +135,8 @@ private DataXceiver(Peer peer, DataNode datanode,
this.datanode = datanode; this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer; this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(datanode.getConf());
this.smallBufferSize = DFSUtil.getSmallBufferSize(datanode.getConf());
remoteAddress = peer.getRemoteAddressString(); remoteAddress = peer.getRemoteAddressString();
final int colonIdx = remoteAddress.indexOf(':'); final int colonIdx = remoteAddress.indexOf(':');
remoteAddressWithoutPort = remoteAddressWithoutPort =
@ -191,7 +197,7 @@ public void run() {
socketIn, datanode.getXferAddress().getPort(), socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId()); datanode.getDatanodeId());
input = new BufferedInputStream(saslStreams.in, input = new BufferedInputStream(saslStreams.in,
HdfsServerConstants.SMALL_BUFFER_SIZE); smallBufferSize);
socketOut = saslStreams.out; socketOut = saslStreams.out;
} catch (InvalidMagicNumberException imne) { } catch (InvalidMagicNumberException imne) {
if (imne.isHandshake4Encryption()) { if (imne.isHandshake4Encryption()) {
@ -514,7 +520,7 @@ public void readBlock(final ExtendedBlock block,
long read = 0; long read = 0;
OutputStream baseStream = getOutputStream(); OutputStream baseStream = getOutputStream();
DataOutputStream out = new DataOutputStream(new BufferedOutputStream( DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsServerConstants.SMALL_BUFFER_SIZE)); baseStream, smallBufferSize));
checkAccess(out, true, block, blockToken, checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ); Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
@ -658,7 +664,7 @@ public void writeBlock(final ExtendedBlock block,
final DataOutputStream replyOut = new DataOutputStream( final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream( new BufferedOutputStream(
getOutputStream(), getOutputStream(),
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
checkAccess(replyOut, isClient, block, blockToken, checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE); Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
@ -717,7 +723,7 @@ public void writeBlock(final ExtendedBlock block,
unbufMirrorOut = saslStreams.out; unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in; unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
mirrorIn = new DataInputStream(unbufMirrorIn); mirrorIn = new DataInputStream(unbufMirrorIn);
// Do not propagate allowLazyPersist to downstream DataNodes. // Do not propagate allowLazyPersist to downstream DataNodes.
@ -932,7 +938,7 @@ public void blockChecksum(final ExtendedBlock block,
.getMetaDataInputStream(block); .getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream( final DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(metadataIn, HdfsServerConstants.IO_FILE_BUFFER_SIZE)); new BufferedInputStream(metadataIn, ioFileBufferSize));
updateCurrentThreadName("Getting checksum for block " + block); updateCurrentThreadName("Getting checksum for block " + block);
try { try {
//read metadata file //read metadata file
@ -1024,7 +1030,7 @@ public void copyBlock(final ExtendedBlock block,
// set up response stream // set up response stream
OutputStream baseStream = getOutputStream(); OutputStream baseStream = getOutputStream();
reply = new DataOutputStream(new BufferedOutputStream( reply = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsServerConstants.SMALL_BUFFER_SIZE)); baseStream, smallBufferSize));
// send status first // send status first
writeSuccessWithChecksumInfo(blockSender, reply); writeSuccessWithChecksumInfo(blockSender, reply);
@ -1132,9 +1138,9 @@ public void replaceBlock(final ExtendedBlock block,
unbufProxyIn = saslStreams.in; unbufProxyIn = saslStreams.in;
proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
HdfsServerConstants.SMALL_BUFFER_SIZE)); smallBufferSize));
proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn, proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
HdfsServerConstants.IO_FILE_BUFFER_SIZE)); ioFileBufferSize));
/* send request to the proxy */ /* send request to the proxy */
IoeDuringCopyBlockOperation = true; IoeDuringCopyBlockOperation = true;

View File

@ -38,10 +38,10 @@
import org.apache.hadoop.fs.DU; import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@ -76,6 +76,7 @@ class BlockPoolSlice {
private final File lazypersistDir; private final File lazypersistDir;
private final File rbwDir; // directory store RBW replica private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica private final File tmpDir; // directory store Temporary replica
private final int ioFileBufferSize;
private static final String DU_CACHE_FILE = "dfsUsed"; private static final String DU_CACHE_FILE = "dfsUsed";
private volatile boolean dfsUsedSaved = false; private volatile boolean dfsUsedSaved = false;
private static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final int SHUTDOWN_HOOK_PRIORITY = 30;
@ -108,6 +109,8 @@ class BlockPoolSlice {
} }
} }
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
this.deleteDuplicateReplicas = conf.getBoolean( this.deleteDuplicateReplicas = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT); DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
@ -612,7 +615,7 @@ private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
} }
checksumIn = new DataInputStream( checksumIn = new DataInputStream(
new BufferedInputStream(new FileInputStream(metaFile), new BufferedInputStream(new FileInputStream(metaFile),
HdfsServerConstants.IO_FILE_BUFFER_SIZE)); ioFileBufferSize));
// read and handle the common header here. For now just a version // read and handle the common header here. For now just a version
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(

View File

@ -58,7 +58,9 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -66,7 +68,6 @@
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
@ -247,6 +248,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
private final int smallBufferSize;
// Used for synchronizing access to usage stats // Used for synchronizing access to usage stats
private final Object statsLock = new Object(); private final Object statsLock = new Object();
@ -264,6 +266,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
this.datanode = datanode; this.datanode = datanode;
this.dataStorage = storage; this.dataStorage = storage;
this.conf = conf; this.conf = conf;
this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
// The number of volumes required for operation is the total number // The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate. // of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated = final int volFailuresTolerated =
@ -837,19 +840,21 @@ static File moveBlockFiles(Block b, File srcfile, File destdir)
* @throws IOException * @throws IOException
*/ */
static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta, static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
File srcFile, File destRoot, boolean calculateChecksum) File srcFile, File destRoot, boolean calculateChecksum,
throws IOException { int smallBufferSize) throws IOException {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName()); final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum); return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum,
smallBufferSize);
} }
static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
File dstFile, boolean calculateChecksum) File dstFile, boolean calculateChecksum,
int smallBufferSize)
throws IOException { throws IOException {
if (calculateChecksum) { if (calculateChecksum) {
computeChecksum(srcMeta, dstMeta, srcFile); computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize);
} else { } else {
try { try {
Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
@ -913,7 +918,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
File[] blockFiles = copyBlockFiles(block.getBlockId(), File[] blockFiles = copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), oldMetaFile, oldBlockFile, block.getGenerationStamp(), oldMetaFile, oldBlockFile,
targetVolume.getTmpDir(block.getBlockPoolId()), targetVolume.getTmpDir(block.getBlockPoolId()),
replicaInfo.isOnTransientStorage()); replicaInfo.isOnTransientStorage(), smallBufferSize);
ReplicaInfo newReplicaInfo = new ReplicaInPipeline( ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
@ -941,7 +946,8 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
* @param blockFile block file for which the checksum will be computed * @param blockFile block file for which the checksum will be computed
* @throws IOException * @throws IOException
*/ */
private static void computeChecksum(File srcMeta, File dstMeta, File blockFile) private static void computeChecksum(File srcMeta, File dstMeta,
File blockFile, int smallBufferSize)
throws IOException { throws IOException {
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
final byte[] data = new byte[1 << 16]; final byte[] data = new byte[1 << 16];
@ -957,7 +963,7 @@ private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
} }
} }
metaOut = new DataOutputStream(new BufferedOutputStream( metaOut = new DataOutputStream(new BufferedOutputStream(
new FileOutputStream(dstMeta), HdfsServerConstants.SMALL_BUFFER_SIZE)); new FileOutputStream(dstMeta), smallBufferSize));
BlockMetadataHeader.writeHeader(metaOut, checksum); BlockMetadataHeader.writeHeader(metaOut, checksum);
int offset = 0; int offset = 0;
@ -2480,7 +2486,7 @@ private File[] copyReplicaWithNewBlockIdAndGS(
final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
return copyBlockFiles(replicaInfo.getMetaFile(), return copyBlockFiles(replicaInfo.getMetaFile(),
replicaInfo.getBlockFile(), replicaInfo.getBlockFile(),
dstMetaFile, dstBlockFile, true); dstMetaFile, dstBlockFile, true, smallBufferSize);
} }
} }

View File

@ -20,10 +20,11 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import javax.ws.rs.HEAD;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -55,6 +56,7 @@ class RamDiskAsyncLazyPersistService {
private final ThreadGroup threadGroup; private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>(); = new HashMap<File, ThreadPoolExecutor>();
private final static HdfsConfiguration EMPTY_HDFS_CONF = new HdfsConfiguration();
/** /**
* Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their * Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their
@ -234,9 +236,11 @@ public void run() {
boolean succeeded = false; boolean succeeded = false;
final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset(); final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
try (FsVolumeReference ref = this.targetVolume) { try (FsVolumeReference ref = this.targetVolume) {
int smallBufferSize = DFSUtil.getSmallBufferSize(EMPTY_HDFS_CONF);
// No FsDatasetImpl lock for the file copy // No FsDatasetImpl lock for the file copy
File targetFiles[] = FsDatasetImpl.copyBlockFiles( File targetFiles[] = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, metaFile, blockFile, lazyPersistDir, true); blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
smallBufferSize);
// Lock FsDataSetImpl during onCompleteLazyPersist callback // Lock FsDataSetImpl during onCompleteLazyPersist callback
dataset.onCompleteLazyPersist(bpId, blockId, dataset.onCompleteLazyPersist(bpId, blockId,

View File

@ -43,8 +43,8 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
@ -77,6 +77,7 @@ public class TransferFsImage {
private final static String CONTENT_TYPE = "Content-Type"; private final static String CONTENT_TYPE = "Content-Type";
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
private final static int IO_FILE_BUFFER_SIZE;
@VisibleForTesting @VisibleForTesting
static int timeout = 0; static int timeout = 0;
@ -88,6 +89,7 @@ public class TransferFsImage {
connectionFactory = URLConnectionFactory connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf); .newDefaultURLConnectionFactory(conf);
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled(); isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
} }
private static final Log LOG = LogFactory.getLog(TransferFsImage.class); private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
@ -336,7 +338,7 @@ public static void copyFileToStream(OutputStream out, File localfile,
private static void copyFileToStream(OutputStream out, File localfile, private static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler, FileInputStream infile, DataTransferThrottler throttler,
Canceler canceler) throws IOException { Canceler canceler) throws IOException {
byte buf[] = new byte[HdfsServerConstants.IO_FILE_BUFFER_SIZE]; byte buf[] = new byte[IO_FILE_BUFFER_SIZE];
try { try {
CheckpointFaultInjector.getInstance() CheckpointFaultInjector.getInstance()
.aboutToSendFile(localfile); .aboutToSendFile(localfile);
@ -345,7 +347,7 @@ private static void copyFileToStream(OutputStream out, File localfile,
shouldSendShortFile(localfile)) { shouldSendShortFile(localfile)) {
// Test sending image shorter than localfile // Test sending image shorter than localfile
long len = localfile.length(); long len = localfile.length();
buf = new byte[(int)Math.min(len/2, HdfsServerConstants.IO_FILE_BUFFER_SIZE)]; buf = new byte[(int)Math.min(len/2, IO_FILE_BUFFER_SIZE)];
// This will read at most half of the image // This will read at most half of the image
// and the rest of the image will be sent over the wire // and the rest of the image will be sent over the wire
infile.read(buf); infile.read(buf);
@ -510,7 +512,7 @@ private static MD5Hash receiveFile(String url, List<File> localPaths,
} }
int num = 1; int num = 1;
byte[] buf = new byte[HdfsServerConstants.IO_FILE_BUFFER_SIZE]; byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
while (num > 0) { while (num > 0) {
num = stream.read(buf); num = stream.read(buf);
if (num > 0) { if (num > 0) {

View File

@ -956,7 +956,7 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(s, writeTimeout), NetUtils.getOutputStream(s, writeTimeout),
HdfsServerConstants.SMALL_BUFFER_SIZE)); DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
// send the request // send the request