diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index f9063b7a892..f4506cf4707 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -170,4 +170,8 @@ public final class ErasureCodingWorker { stripedReconstructionPool.shutdown(); stripedReadPool.shutdown(); } + + public float getXmitWeight() { + return xmitWeight; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index 29c0078e957..1af2380886a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -67,7 +67,11 @@ class StripedBlockReconstructor extends StripedReconstructor LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e); getDatanode().getMetrics().incrECFailedReconstructionTasks(); } finally { - getDatanode().decrementXmitsInProgress(getXmits()); + float xmitWeight = getErasureCodingWorker().getXmitWeight(); + // if the xmits is smaller than 1, the xmitsSubmitted should be set to 1 + // because if it set to zero, we cannot to measure the xmits submitted + int xmitsSubmitted = Math.max((int) (getXmits() * xmitWeight), 1); + getDatanode().decrementXmitsInProgress(xmitsSubmitted); final DataNodeMetrics metrics = getDatanode().getMetrics(); metrics.incrECReconstructionTasks(); metrics.incrECReconstructionBytesRead(getBytesRead()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index a1f4c7ff55e..4c8be827f43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -275,4 +275,8 @@ abstract class StripedReconstructor { DataNode getDatanode() { return datanode; } + + public ErasureCodingWorker getErasureCodingWorker() { + return erasureCodingWorker; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 2abfff7876c..b119e7855b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -514,6 +514,8 @@ public class TestReconstructStripedFile { @Test(timeout = 180000) public void testErasureCodingWorkerXmitsWeight() throws Exception { + testErasureCodingWorkerXmitsWeight(0.5f, + (int) (ecPolicy.getNumDataUnits() * 0.5f)); testErasureCodingWorkerXmitsWeight(1f, ecPolicy.getNumDataUnits()); testErasureCodingWorkerXmitsWeight(0f, 1); testErasureCodingWorkerXmitsWeight(10f, 10 * ecPolicy.getNumDataUnits()); @@ -567,6 +569,10 @@ public class TestReconstructStripedFile { } finally { barrier.await(); DataNodeFaultInjector.set(oldInjector); + for (final DataNode curDn : cluster.getDataNodes()) { + GenericTestUtils.waitFor(() -> curDn.getXceiverCount() <= 1, 10, 60000); + assertEquals(0, curDn.getXmitsInProgress()); + } } } }