From 72d8b92ba5bdc5dc0cf434d06d90fb0b1810fecf Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Thu, 5 Sep 2019 11:44:02 -0700 Subject: [PATCH] HDFS-12904. Add DataTransferThrottler to the Datanode transfers. Contributed by Lisheng Sun. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++++ .../hadoop/hdfs/server/datanode/DataNode.java | 15 ++++++++++++++- .../hdfs/server/datanode/DataXceiverServer.java | 15 +++++++++++++++ .../src/main/resources/hdfs-default.xml | 10 ++++++++++ 4 files changed, 43 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b4011bb55b7..6f76dc03f96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -115,6 +115,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { = "dfs.datanode.balance.max.concurrent.moves"; public static final int DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 100; + public static final String DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY = + "dfs.datanode.data.transfer.bandwidthPerSec"; + public static final long DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT = + 0; // A value of zero indicates no limit @Deprecated public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index afd1c602ebe..a298e78d590 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker; import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -2499,6 +2500,9 @@ private class DataTransfer implements Runnable { final String clientname; final CachingStrategy cachingStrategy; + /** Throttle to block replication when data transfers. */ + private DataTransferThrottler transferThrottler; + /** * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. @@ -2525,6 +2529,15 @@ private class DataTransfer implements Runnable { this.clientname = clientname; this.cachingStrategy = new CachingStrategy(true, getDnConf().readaheadLength); + // 1. the stage is PIPELINE_SETUP_CREATE,that is moving blocks, set + // throttler. + // 2. the stage is PIPELINE_SETUP_APPEND_RECOVERY or + // PIPELINE_SETUP_STREAMING_RECOVERY, + // that is writing and recovering pipeline, don't set throttle. + if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE + && clientname.isEmpty()) { + this.transferThrottler = xserver.getTransferThrottler(); + } } /** @@ -2583,7 +2596,7 @@ public void run() { targetStorageIds); // send data & checksum - blockSender.sendBlock(out, unbufOut, null); + blockSender.sendBlock(out, unbufOut, transferThrottler); // no response necessary LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 46cb21e8d3a..656bb3ea63b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -169,6 +169,8 @@ void release() { final BlockBalanceThrottler balanceThrottler; + private final DataTransferThrottler transferThrottler; + /** * Stores an estimate for block size to check if the disk partition has enough * space. Newer clients pass the expected block size to the DataNode. For @@ -194,6 +196,15 @@ void release() { DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT), conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT)); + + long bandwidthPerSec = conf.getLongBytes( + DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT); + if (bandwidthPerSec > 0) { + this.transferThrottler = new DataTransferThrottler(bandwidthPerSec); + } else { + this.transferThrottler = null; + } } @Override @@ -443,6 +454,10 @@ PeerServer getPeerServer() { return peerServer; } + public DataTransferThrottler getTransferThrottler() { + return transferThrottler; + } + /** * Release a peer. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 689cae3187f..d41585f1f01 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4144,6 +4144,16 @@ + + dfs.datanode.data.transfer.bandwidthPerSec + 0 + + Specifies the maximum amount of bandwidth that each datanode can utilize for the data transfering purpose in term + of the number of bytes per second. + when the bandwidth value is zero, there is no limit. + + + dfs.datanode.fsdataset.factory