Erasure Coding: metrics xmitsInProgress become to negative. Contributed by maobaolong and Toshihiko Uchida.
This commit is contained in:
parent
e32e1384d9
commit
df16146e7f
|
@ -170,4 +170,8 @@ public final class ErasureCodingWorker {
|
||||||
stripedReconstructionPool.shutdown();
|
stripedReconstructionPool.shutdown();
|
||||||
stripedReadPool.shutdown();
|
stripedReadPool.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public float getXmitWeight() {
|
||||||
|
return xmitWeight;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,11 @@ class StripedBlockReconstructor extends StripedReconstructor
|
||||||
LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e);
|
LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e);
|
||||||
getDatanode().getMetrics().incrECFailedReconstructionTasks();
|
getDatanode().getMetrics().incrECFailedReconstructionTasks();
|
||||||
} finally {
|
} finally {
|
||||||
getDatanode().decrementXmitsInProgress(getXmits());
|
float xmitWeight = getErasureCodingWorker().getXmitWeight();
|
||||||
|
// if the xmits is smaller than 1, the xmitsSubmitted should be set to 1
|
||||||
|
// because if it set to zero, we cannot to measure the xmits submitted
|
||||||
|
int xmitsSubmitted = Math.max((int) (getXmits() * xmitWeight), 1);
|
||||||
|
getDatanode().decrementXmitsInProgress(xmitsSubmitted);
|
||||||
final DataNodeMetrics metrics = getDatanode().getMetrics();
|
final DataNodeMetrics metrics = getDatanode().getMetrics();
|
||||||
metrics.incrECReconstructionTasks();
|
metrics.incrECReconstructionTasks();
|
||||||
metrics.incrECReconstructionBytesRead(getBytesRead());
|
metrics.incrECReconstructionBytesRead(getBytesRead());
|
||||||
|
|
|
@ -275,4 +275,8 @@ abstract class StripedReconstructor {
|
||||||
DataNode getDatanode() {
|
DataNode getDatanode() {
|
||||||
return datanode;
|
return datanode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ErasureCodingWorker getErasureCodingWorker() {
|
||||||
|
return erasureCodingWorker;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -514,6 +514,8 @@ public class TestReconstructStripedFile {
|
||||||
|
|
||||||
@Test(timeout = 180000)
|
@Test(timeout = 180000)
|
||||||
public void testErasureCodingWorkerXmitsWeight() throws Exception {
|
public void testErasureCodingWorkerXmitsWeight() throws Exception {
|
||||||
|
testErasureCodingWorkerXmitsWeight(0.5f,
|
||||||
|
(int) (ecPolicy.getNumDataUnits() * 0.5f));
|
||||||
testErasureCodingWorkerXmitsWeight(1f, ecPolicy.getNumDataUnits());
|
testErasureCodingWorkerXmitsWeight(1f, ecPolicy.getNumDataUnits());
|
||||||
testErasureCodingWorkerXmitsWeight(0f, 1);
|
testErasureCodingWorkerXmitsWeight(0f, 1);
|
||||||
testErasureCodingWorkerXmitsWeight(10f, 10 * ecPolicy.getNumDataUnits());
|
testErasureCodingWorkerXmitsWeight(10f, 10 * ecPolicy.getNumDataUnits());
|
||||||
|
@ -567,6 +569,10 @@ public class TestReconstructStripedFile {
|
||||||
} finally {
|
} finally {
|
||||||
barrier.await();
|
barrier.await();
|
||||||
DataNodeFaultInjector.set(oldInjector);
|
DataNodeFaultInjector.set(oldInjector);
|
||||||
|
for (final DataNode curDn : cluster.getDataNodes()) {
|
||||||
|
GenericTestUtils.waitFor(() -> curDn.getXceiverCount() <= 1, 10, 60000);
|
||||||
|
assertEquals(0, curDn.getXmitsInProgress());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue