diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0aaba2f8ff4..f253383ccae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -183,6 +183,9 @@ Release 2.8.0 - UNRELEASED HDFS-7758. Retire FsDatasetSpi#getVolumes() and use FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe) + HDFS-8314. Move HdfsServerConstants#IO_FILE_BUFFER_SIZE and + SMALL_BUFFER_SIZE to the users. (Li Lu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index acfb41bba62..265d094772b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -240,6 +240,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new DFSHedgedReadMetrics(); private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private final Sampler traceSampler; + private final int smallBufferSize; public DfsClientConf getConf() { return dfsClientConf; @@ -311,6 +312,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, this.stats = stats; this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); + this.smallBufferSize = DFSUtil.getSmallBufferSize(conf); this.ugi = UserGroupInformation.getCurrentUser(); @@ -1901,7 +1903,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, //connect to a datanode IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); out = new DataOutputStream(new BufferedOutputStream(pair.out, - HdfsServerConstants.SMALL_BUFFER_SIZE)); + smallBufferSize)); in = new DataInputStream(pair.in); if (LOG.isDebugEnabled()) { @@ -2066,7 +2068,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, - HdfsServerConstants.SMALL_BUFFER_SIZE)); + smallBufferSize)); DataInputStream in = new DataInputStream(pair.in); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 10b0a5893b8..76a189520fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -72,6 +72,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -1549,4 +1550,14 @@ public class DFSUtil { .createKeyProviderCryptoExtension(keyProvider); return cryptoProvider; } + + public static int getIoFileBufferSize(Configuration conf) { + return conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + } + + public static int getSmallBufferSize(Configuration conf) { + return Math.min(getIoFileBufferSize(conf) / 2, 512); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 43787ab774e..96bf21263ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -71,7 +71,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.util.ByteArrayManager; @@ -92,7 +91,6 @@ import org.apache.htrace.Trace; import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceScope; -import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -123,6 +121,7 @@ import com.google.common.cache.RemovalNotification; @InterfaceAudience.Private class DataStreamer extends Daemon { static final Log LOG = LogFactory.getLog(DataStreamer.class); + /** * Create a socket for a write pipeline * @@ -1145,7 +1144,7 @@ class DataStreamer extends Daemon { unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsServerConstants.SMALL_BUFFER_SIZE)); + DFSUtil.getSmallBufferSize(dfsClient.getConfiguration()))); in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request @@ -1425,7 +1424,7 @@ class DataStreamer extends Daemon { unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsServerConstants.SMALL_BUFFER_SIZE)); + DFSUtil.getSmallBufferSize(dfsClient.getConfiguration()))); blockReplyStream = new DataInputStream(unbufIn); // diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index f7c299debba..05c458aaf2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -120,6 +120,8 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int ioFileBufferSize; + private static class GlobalBlockMap { private final Map map = new HashMap(); @@ -310,9 +312,9 @@ public class Dispatcher { unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsServerConstants.IO_FILE_BUFFER_SIZE)); + ioFileBufferSize)); in = new DataInputStream(new BufferedInputStream(unbufIn, - HdfsServerConstants.IO_FILE_BUFFER_SIZE)); + ioFileBufferSize)); sendRequest(out, eb, accessToken); receiveResponse(in); @@ -803,6 +805,7 @@ public class Dispatcher { this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); + this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf); } public DistributedFileSystem getDistributedFileSystem() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 6934d84ab9c..63c0ac7b0cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -24,9 +24,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; @@ -56,12 +54,6 @@ public interface HdfsServerConstants { // to 1k. int MAX_PATH_LENGTH = 8000; int MAX_PATH_DEPTH = 1000; - int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt( - CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, - CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); - // Used for writing header etc. - int SMALL_BUFFER_SIZE = Math.min(IO_FILE_BUFFER_SIZE / 2, - 512); // An invalid transaction ID that will never be seen in a real namesystem. long INVALID_TXID = -12345; // Number of generation stamps reserved for legacy blocks. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java index 04700b88b99..4977fd7069e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -33,7 +33,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -60,6 +61,8 @@ public class BlockMetadataHeader { */ private final short version; private DataChecksum checksum = null; + + private static final HdfsConfiguration conf = new HdfsConfiguration(); @VisibleForTesting public BlockMetadataHeader(short version, DataChecksum checksum) { @@ -85,7 +88,7 @@ public class BlockMetadataHeader { DataInputStream in = null; try { in = new DataInputStream(new BufferedInputStream( - new FileInputStream(metaFile), HdfsServerConstants.IO_FILE_BUFFER_SIZE)); + new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf))); return readDataChecksum(in, metaFile); } finally { IOUtils.closeStream(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 10692d4dd45..90e21949b54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; @@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -246,7 +246,8 @@ class BlockReceiver implements Closeable { out.getClass()); } this.checksumOut = new DataOutputStream(new BufferedOutputStream( - streams.getChecksumOut(), HdfsServerConstants.SMALL_BUFFER_SIZE)); + streams.getChecksumOut(), DFSUtil.getSmallBufferSize( + datanode.getConf()))); // write data chunk header if creating a new replica if (isCreate) { BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 6b958a2acd5..79f4dd7aa2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -34,9 +34,10 @@ import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.util.DataTransferThrottler; @@ -104,8 +105,13 @@ class BlockSender implements java.io.Closeable { * not sure if there will be much more improvement. */ private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024; + private static final int IO_FILE_BUFFER_SIZE; + static { + HdfsConfiguration conf = new HdfsConfiguration(); + IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf); + } private static final int TRANSFERTO_BUFFER_SIZE = Math.max( - HdfsServerConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO); + IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO); /** the block to read from */ private final ExtendedBlock block; @@ -298,7 +304,7 @@ class BlockSender implements java.io.Closeable { // storage and computes the checksum. if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) { checksumIn = new DataInputStream(new BufferedInputStream( - metaIn, HdfsServerConstants.IO_FILE_BUFFER_SIZE)); + metaIn, IO_FILE_BUFFER_SIZE)); csum = BlockMetadataHeader.readDataChecksum(checksumIn, block); keepMetaInOpen = true; @@ -747,7 +753,7 @@ class BlockSender implements java.io.Closeable { pktBufSize += checksumSize * maxChunksPerPacket; } else { maxChunksPerPacket = Math.max(1, - numberOfChunks(HdfsServerConstants.IO_FILE_BUFFER_SIZE)); + numberOfChunks(IO_FILE_BUFFER_SIZE)); // Packet size includes both checksum and data pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 4f99f77185d..28c9fcc1d20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2163,7 +2163,7 @@ public class DataNode extends ReconfigurableBase unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsServerConstants.SMALL_BUFFER_SIZE)); + DFSUtil.getSmallBufferSize(conf))); in = new DataInputStream(unbufIn); blockSender = new BlockSender(b, 0, b.getNumBytes(), false, false, true, DataNode.this, null, cachingStrategy); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index de25579b833..26d669cb5fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -48,7 +48,9 @@ import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -109,7 +111,9 @@ class DataXceiver extends Receiver implements Runnable { private final InputStream socketIn; private OutputStream socketOut; private BlockReceiver blockReceiver = null; - + private final int ioFileBufferSize; + private final int smallBufferSize; + /** * Client Name used in previous operation. Not available on first request * on the socket. @@ -131,6 +135,8 @@ class DataXceiver extends Receiver implements Runnable { this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; + this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(datanode.getConf()); + this.smallBufferSize = DFSUtil.getSmallBufferSize(datanode.getConf()); remoteAddress = peer.getRemoteAddressString(); final int colonIdx = remoteAddress.indexOf(':'); remoteAddressWithoutPort = @@ -191,7 +197,7 @@ class DataXceiver extends Receiver implements Runnable { socketIn, datanode.getXferAddress().getPort(), datanode.getDatanodeId()); input = new BufferedInputStream(saslStreams.in, - HdfsServerConstants.SMALL_BUFFER_SIZE); + smallBufferSize); socketOut = saslStreams.out; } catch (InvalidMagicNumberException imne) { if (imne.isHandshake4Encryption()) { @@ -514,7 +520,7 @@ class DataXceiver extends Receiver implements Runnable { long read = 0; OutputStream baseStream = getOutputStream(); DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - baseStream, HdfsServerConstants.SMALL_BUFFER_SIZE)); + baseStream, smallBufferSize)); checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ); @@ -658,7 +664,7 @@ class DataXceiver extends Receiver implements Runnable { final DataOutputStream replyOut = new DataOutputStream( new BufferedOutputStream( getOutputStream(), - HdfsServerConstants.SMALL_BUFFER_SIZE)); + smallBufferSize)); checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE); @@ -717,7 +723,7 @@ class DataXceiver extends Receiver implements Runnable { unbufMirrorOut = saslStreams.out; unbufMirrorIn = saslStreams.in; mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, - HdfsServerConstants.SMALL_BUFFER_SIZE)); + smallBufferSize)); mirrorIn = new DataInputStream(unbufMirrorIn); // Do not propagate allowLazyPersist to downstream DataNodes. @@ -932,7 +938,7 @@ class DataXceiver extends Receiver implements Runnable { .getMetaDataInputStream(block); final DataInputStream checksumIn = new DataInputStream( - new BufferedInputStream(metadataIn, HdfsServerConstants.IO_FILE_BUFFER_SIZE)); + new BufferedInputStream(metadataIn, ioFileBufferSize)); updateCurrentThreadName("Getting checksum for block " + block); try { //read metadata file @@ -1024,7 +1030,7 @@ class DataXceiver extends Receiver implements Runnable { // set up response stream OutputStream baseStream = getOutputStream(); reply = new DataOutputStream(new BufferedOutputStream( - baseStream, HdfsServerConstants.SMALL_BUFFER_SIZE)); + baseStream, smallBufferSize)); // send status first writeSuccessWithChecksumInfo(blockSender, reply); @@ -1131,10 +1137,10 @@ class DataXceiver extends Receiver implements Runnable { unbufProxyOut = saslStreams.out; unbufProxyIn = saslStreams.in; - proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, - HdfsServerConstants.SMALL_BUFFER_SIZE)); + proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, + smallBufferSize)); proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn, - HdfsServerConstants.IO_FILE_BUFFER_SIZE)); + ioFileBufferSize)); /* send request to the proxy */ IoeDuringCopyBlockOperation = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 6edf9b86855..5bc9c680810 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -38,10 +38,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DU; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; @@ -76,6 +76,7 @@ class BlockPoolSlice { private final File lazypersistDir; private final File rbwDir; // directory store RBW replica private final File tmpDir; // directory store Temporary replica + private final int ioFileBufferSize; private static final String DU_CACHE_FILE = "dfsUsed"; private volatile boolean dfsUsedSaved = false; private static final int SHUTDOWN_HOOK_PRIORITY = 30; @@ -108,6 +109,8 @@ class BlockPoolSlice { } } + this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf); + this.deleteDuplicateReplicas = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT); @@ -618,7 +621,7 @@ class BlockPoolSlice { } checksumIn = new DataInputStream( new BufferedInputStream(new FileInputStream(metaFile), - HdfsServerConstants.IO_FILE_BUFFER_SIZE)); + ioFileBufferSize)); // read and handle the common header here. For now just a version final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 1bb89e24c31..f71ffef1359 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -59,7 +59,9 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -67,7 +69,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; @@ -249,6 +250,7 @@ class FsDatasetImpl implements FsDatasetSpi { private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; + private final int smallBufferSize; // Used for synchronizing access to usage stats private final Object statsLock = new Object(); @@ -266,6 +268,7 @@ class FsDatasetImpl implements FsDatasetSpi { this.datanode = datanode; this.dataStorage = storage; this.conf = conf; + this.smallBufferSize = DFSUtil.getSmallBufferSize(conf); // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = @@ -839,19 +842,21 @@ class FsDatasetImpl implements FsDatasetSpi { * @throws IOException */ static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta, - File srcFile, File destRoot, boolean calculateChecksum) - throws IOException { + File srcFile, File destRoot, boolean calculateChecksum, + int smallBufferSize) throws IOException { final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File dstFile = new File(destDir, srcFile.getName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); - return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum); + return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum, + smallBufferSize); } static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, - File dstFile, boolean calculateChecksum) + File dstFile, boolean calculateChecksum, + int smallBufferSize) throws IOException { if (calculateChecksum) { - computeChecksum(srcMeta, dstMeta, srcFile); + computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize); } else { try { Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); @@ -915,7 +920,7 @@ class FsDatasetImpl implements FsDatasetSpi { File[] blockFiles = copyBlockFiles(block.getBlockId(), block.getGenerationStamp(), oldMetaFile, oldBlockFile, targetVolume.getTmpDir(block.getBlockPoolId()), - replicaInfo.isOnTransientStorage()); + replicaInfo.isOnTransientStorage(), smallBufferSize); ReplicaInfo newReplicaInfo = new ReplicaInPipeline( replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), @@ -943,7 +948,8 @@ class FsDatasetImpl implements FsDatasetSpi { * @param blockFile block file for which the checksum will be computed * @throws IOException */ - private static void computeChecksum(File srcMeta, File dstMeta, File blockFile) + private static void computeChecksum(File srcMeta, File dstMeta, + File blockFile, int smallBufferSize) throws IOException { final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); final byte[] data = new byte[1 << 16]; @@ -959,7 +965,7 @@ class FsDatasetImpl implements FsDatasetSpi { } } metaOut = new DataOutputStream(new BufferedOutputStream( - new FileOutputStream(dstMeta), HdfsServerConstants.SMALL_BUFFER_SIZE)); + new FileOutputStream(dstMeta), smallBufferSize)); BlockMetadataHeader.writeHeader(metaOut, checksum); int offset = 0; @@ -2480,8 +2486,9 @@ class FsDatasetImpl implements FsDatasetSpi { final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); final File dstBlockFile = new File(destDir, blockFileName); final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); - return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), - dstMetaFile, dstBlockFile, true); + return copyBlockFiles(replicaInfo.getMetaFile(), + replicaInfo.getBlockFile(), + dstMetaFile, dstBlockFile, true, smallBufferSize); } @Override // FsDatasetSpi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 2327ae91421..791a711e49e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; @@ -54,6 +56,7 @@ class RamDiskAsyncLazyPersistService { private final ThreadGroup threadGroup; private Map executors = new HashMap(); + private final static HdfsConfiguration EMPTY_HDFS_CONF = new HdfsConfiguration(); /** * Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their @@ -232,10 +235,12 @@ class RamDiskAsyncLazyPersistService { public void run() { boolean succeeded = false; final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset(); - try { + try (FsVolumeReference ref = this.targetVolume) { + int smallBufferSize = DFSUtil.getSmallBufferSize(EMPTY_HDFS_CONF); // No FsDatasetImpl lock for the file copy File targetFiles[] = FsDatasetImpl.copyBlockFiles( - blockId, genStamp, metaFile, blockFile, lazyPersistDir, true); + blockId, genStamp, metaFile, blockFile, lazyPersistDir, true, + smallBufferSize); // Lock FsDataSetImpl during onCompleteLazyPersist callback dataset.onCompleteLazyPersist(bpId, blockId, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index 041c3cb9ab9..9783ccacf86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -43,8 +43,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; @@ -77,6 +77,7 @@ public class TransferFsImage { private final static String CONTENT_TYPE = "Content-Type"; private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; + private final static int IO_FILE_BUFFER_SIZE; @VisibleForTesting static int timeout = 0; @@ -88,6 +89,7 @@ public class TransferFsImage { connectionFactory = URLConnectionFactory .newDefaultURLConnectionFactory(conf); isSpnegoEnabled = UserGroupInformation.isSecurityEnabled(); + IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf); } private static final Log LOG = LogFactory.getLog(TransferFsImage.class); @@ -336,7 +338,7 @@ public class TransferFsImage { private static void copyFileToStream(OutputStream out, File localfile, FileInputStream infile, DataTransferThrottler throttler, Canceler canceler) throws IOException { - byte buf[] = new byte[HdfsServerConstants.IO_FILE_BUFFER_SIZE]; + byte buf[] = new byte[IO_FILE_BUFFER_SIZE]; try { CheckpointFaultInjector.getInstance() .aboutToSendFile(localfile); @@ -345,7 +347,7 @@ public class TransferFsImage { shouldSendShortFile(localfile)) { // Test sending image shorter than localfile long len = localfile.length(); - buf = new byte[(int)Math.min(len/2, HdfsServerConstants.IO_FILE_BUFFER_SIZE)]; + buf = new byte[(int)Math.min(len/2, IO_FILE_BUFFER_SIZE)]; // This will read at most half of the image // and the rest of the image will be sent over the wire infile.read(buf); @@ -510,7 +512,7 @@ public class TransferFsImage { } int num = 1; - byte[] buf = new byte[HdfsServerConstants.IO_FILE_BUFFER_SIZE]; + byte[] buf = new byte[IO_FILE_BUFFER_SIZE]; while (num > 0) { num = stream.read(buf); if (num > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 4d3d86ad2d7..a73f5957f95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -959,7 +959,7 @@ public class DFSTestUtil { final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(s, writeTimeout), - HdfsServerConstants.SMALL_BUFFER_SIZE)); + DFSUtil.getSmallBufferSize(dfsClient.getConfiguration()))); final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); // send the request diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java index 0e1cebb94c5..8b1223dc42e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java @@ -93,7 +93,7 @@ public class TestFileStatus { int fileSize, int blockSize) throws IOException { // Create and write a file that contains three blocks of data FSDataOutputStream stm = fileSys.create(name, true, - HdfsServerConstants.IO_FILE_BUFFER_SIZE, (short)repl, (long)blockSize); + DFSUtil.getIoFileBufferSize(conf), (short)repl, (long)blockSize); byte[] buffer = new byte[fileSize]; Random rand = new Random(seed); rand.nextBytes(buffer);