diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 9ed564ebd6f..7a7647f31cc 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -123,6 +123,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.data.write.bandwidthPerSec"; // A value of zero indicates no limit public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0; + public static final String DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY = + "dfs.datanode.ec.reconstruct.read.bandwidthPerSec"; + public static final long DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT = + 0; // A value of zero indicates no limit + public static final String DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_KEY = + "dfs.datanode.ec.reconstruct.write.bandwidthPerSec"; + public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT = + 0; // A value of zero indicates no limit @Deprecated public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 57aa2c31cab..10438cd79e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -470,6 +470,9 @@ public class DataNode extends ReconfigurableBase private long startTime = 0; + private DataTransferThrottler ecReconstuctReadThrottler; + private DataTransferThrottler ecReconstuctWriteThrottler; + /** * Creates a dummy DataNode for testing purpose. */ @@ -584,6 +587,16 @@ public class DataNode extends ReconfigurableBase initOOBTimeout(); this.storageLocationChecker = storageLocationChecker; + long ecReconstuctReadBandwidth = conf.getLongBytes( + DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT); + long ecReconstuctWriteBandwidth = conf.getLongBytes( + DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT); + this.ecReconstuctReadThrottler = ecReconstuctReadBandwidth > 0 ? + new DataTransferThrottler(100, ecReconstuctReadBandwidth) : null; + this.ecReconstuctWriteThrottler = ecReconstuctWriteBandwidth > 0 ? + new DataTransferThrottler(100, ecReconstuctWriteBandwidth) : null; } @Override // ReconfigurableBase @@ -3830,6 +3843,14 @@ public class DataNode extends ReconfigurableBase return shortCircuitRegistry; } + public DataTransferThrottler getEcReconstuctReadThrottler() { + return ecReconstuctReadThrottler; + } + + public DataTransferThrottler getEcReconstuctWriteThrottler() { + return ecReconstuctWriteThrottler; + } + /** * Check the disk error synchronously. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index 3ead793542c..ecd6351b46f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -95,6 +95,10 @@ class StripedBlockReconstructor extends StripedReconstructor (int) Math.min(getStripedReader().getBufferSize(), remaining); long start = Time.monotonicNow(); + long bytesToRead = (long) toReconstructLen * getStripedReader().getMinRequiredSources(); + if (getDatanode().getEcReconstuctReadThrottler() != null) { + getDatanode().getEcReconstuctReadThrottler().throttle(bytesToRead); + } // step1: read from minimum source DNs required for reconstruction. // The returned success list is the source DNs we do real read from getStripedReader().readMinimumSources(toReconstructLen); @@ -105,6 +109,10 @@ class StripedBlockReconstructor extends StripedReconstructor long decodeEnd = Time.monotonicNow(); // step3: transfer data + long bytesToWrite = (long) toReconstructLen * stripedWriter.getTargets(); + if (getDatanode().getEcReconstuctWriteThrottler() != null) { + getDatanode().getEcReconstuctWriteThrottler().throttle(bytesToWrite); + } if (stripedWriter.transferData2Targets() == 0) { String error = "Transfer failed for all targets."; throw new IOException(error); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index 940cb711728..20d5c6f44fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -508,4 +508,9 @@ class StripedReader { int getXmits() { return xmits; } + + public int getMinRequiredSources() { + return minRequiredSources; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b21134cbae2..2edc963369e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4724,6 +4724,24 @@ + + dfs.datanode.ec.reconstruct.read.bandwidthPerSec + 0 + + Specifies the maximum amount of bandwidth that the EC reconstruction can utilize for reading. + When the bandwidth value is zero, there is no limit. + + + + + dfs.datanode.ec.reconstruct.write.bandwidthPerSec + 0 + + Specifies the maximum amount of bandwidth that the EC reconstruction can utilize for writing. + When the bandwidth value is zero, there is no limit. + + + dfs.datanode.fsdataset.factory