HDFS-11216. Add remoteBytesRead counter metrics for erasure coding reconstruction task. Contributed by Sammi Chen
This commit is contained in:
parent
ae401539ea
commit
56a13a6a59
@ -236,6 +236,13 @@ public static long getLongCounter(String name, MetricsRecordBuilder rb) {
|
|||||||
return captor.getValue();
|
return captor.getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static long getLongCounterWithoutCheck(String name,
|
||||||
|
MetricsRecordBuilder rb) {
|
||||||
|
ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||||
|
verify(rb, atLeast(0)).addCounter(eqName(info(name, "")), captor.capture());
|
||||||
|
return captor.getValue();
|
||||||
|
}
|
||||||
|
|
||||||
public static String getStringMetric(String name, MetricsRecordBuilder rb) {
|
public static String getStringMetric(String name, MetricsRecordBuilder rb) {
|
||||||
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
|
||||||
verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());
|
verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());
|
||||||
|
@ -65,6 +65,7 @@ class StripedBlockReader {
|
|||||||
private final DatanodeInfo source;
|
private final DatanodeInfo source;
|
||||||
private BlockReader blockReader;
|
private BlockReader blockReader;
|
||||||
private ByteBuffer buffer;
|
private ByteBuffer buffer;
|
||||||
|
private boolean isLocal;
|
||||||
|
|
||||||
StripedBlockReader(StripedReader stripedReader, DataNode datanode,
|
StripedBlockReader(StripedReader stripedReader, DataNode datanode,
|
||||||
Configuration conf, short index, ExtendedBlock block,
|
Configuration conf, short index, ExtendedBlock block,
|
||||||
@ -76,6 +77,7 @@ class StripedBlockReader {
|
|||||||
this.index = index;
|
this.index = index;
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.block = block;
|
this.block = block;
|
||||||
|
this.isLocal = false;
|
||||||
|
|
||||||
BlockReader tmpBlockReader = createBlockReader(offsetInBlock);
|
BlockReader tmpBlockReader = createBlockReader(offsetInBlock);
|
||||||
if (tmpBlockReader != null) {
|
if (tmpBlockReader != null) {
|
||||||
@ -116,10 +118,13 @@ private BlockReader createBlockReader(long offsetInBlock) {
|
|||||||
*
|
*
|
||||||
* TODO: add proper tracer
|
* TODO: add proper tracer
|
||||||
*/
|
*/
|
||||||
|
Peer peer = newConnectedPeer(block, dnAddr, blockToken, source);
|
||||||
|
if (peer.isLocal()) {
|
||||||
|
this.isLocal = true;
|
||||||
|
}
|
||||||
return BlockReaderRemote.newBlockReader(
|
return BlockReaderRemote.newBlockReader(
|
||||||
"dummy", block, blockToken, offsetInBlock,
|
"dummy", block, blockToken, offsetInBlock,
|
||||||
block.getNumBytes() - offsetInBlock, true,
|
block.getNumBytes() - offsetInBlock, true, "", peer, source,
|
||||||
"", newConnectedPeer(block, dnAddr, blockToken, source), source,
|
|
||||||
null, stripedReader.getCachingStrategy(), datanode.getTracer(), -1);
|
null, stripedReader.getCachingStrategy(), datanode.getTracer(), -1);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info("Exception while creating remote block reader, datanode {}",
|
LOG.info("Exception while creating remote block reader, datanode {}",
|
||||||
@ -187,7 +192,7 @@ private void actualReadFromBlock() throws IOException {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
n += nread;
|
n += nread;
|
||||||
stripedReader.getReconstructor().incrBytesRead(nread);
|
stripedReader.getReconstructor().incrBytesRead(isLocal, nread);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,6 +70,7 @@ public void run() {
|
|||||||
final DataNodeMetrics metrics = getDatanode().getMetrics();
|
final DataNodeMetrics metrics = getDatanode().getMetrics();
|
||||||
metrics.incrECReconstructionTasks();
|
metrics.incrECReconstructionTasks();
|
||||||
metrics.incrECReconstructionBytesRead(getBytesRead());
|
metrics.incrECReconstructionBytesRead(getBytesRead());
|
||||||
|
metrics.incrECReconstructionRemoteBytesRead(getRemoteBytesRead());
|
||||||
metrics.incrECReconstructionBytesWritten(getBytesWritten());
|
metrics.incrECReconstructionBytesWritten(getBytesWritten());
|
||||||
getStripedReader().close();
|
getStripedReader().close();
|
||||||
stripedWriter.close();
|
stripedWriter.close();
|
||||||
|
@ -118,6 +118,7 @@ abstract class StripedReconstructor {
|
|||||||
// metrics
|
// metrics
|
||||||
private AtomicLong bytesRead = new AtomicLong(0);
|
private AtomicLong bytesRead = new AtomicLong(0);
|
||||||
private AtomicLong bytesWritten = new AtomicLong(0);
|
private AtomicLong bytesWritten = new AtomicLong(0);
|
||||||
|
private AtomicLong remoteBytesRead = new AtomicLong(0);
|
||||||
|
|
||||||
StripedReconstructor(ErasureCodingWorker worker,
|
StripedReconstructor(ErasureCodingWorker worker,
|
||||||
StripedReconstructionInfo stripedReconInfo) {
|
StripedReconstructionInfo stripedReconInfo) {
|
||||||
@ -138,8 +139,13 @@ abstract class StripedReconstructor {
|
|||||||
positionInBlock = 0L;
|
positionInBlock = 0L;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void incrBytesRead(long delta) {
|
public void incrBytesRead(boolean local, long delta) {
|
||||||
bytesRead.addAndGet(delta);
|
if (local) {
|
||||||
|
bytesRead.addAndGet(delta);
|
||||||
|
} else {
|
||||||
|
bytesRead.addAndGet(delta);
|
||||||
|
remoteBytesRead.addAndGet(delta);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void incrBytesWritten(long delta) {
|
public void incrBytesWritten(long delta) {
|
||||||
@ -150,6 +156,10 @@ public long getBytesRead() {
|
|||||||
return bytesRead.get();
|
return bytesRead.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getRemoteBytesRead() {
|
||||||
|
return remoteBytesRead.get();
|
||||||
|
}
|
||||||
|
|
||||||
public long getBytesWritten() {
|
public long getBytesWritten() {
|
||||||
return bytesWritten.get();
|
return bytesWritten.get();
|
||||||
}
|
}
|
||||||
|
@ -140,6 +140,8 @@ public class DataNodeMetrics {
|
|||||||
MutableCounterLong ecReconstructionBytesRead;
|
MutableCounterLong ecReconstructionBytesRead;
|
||||||
@Metric("Bytes written by erasure coding worker")
|
@Metric("Bytes written by erasure coding worker")
|
||||||
MutableCounterLong ecReconstructionBytesWritten;
|
MutableCounterLong ecReconstructionBytesWritten;
|
||||||
|
@Metric("Bytes remote read by erasure coding worker")
|
||||||
|
MutableCounterLong ecReconstructionRemoteBytesRead;
|
||||||
|
|
||||||
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
||||||
final String name;
|
final String name;
|
||||||
@ -459,6 +461,10 @@ public void incrECReconstructionBytesRead(long bytes) {
|
|||||||
ecReconstructionBytesRead.incr(bytes);
|
ecReconstructionBytesRead.incr(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrECReconstructionRemoteBytesRead(long bytes) {
|
||||||
|
ecReconstructionRemoteBytesRead.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
public void incrECReconstructionBytesWritten(long bytes) {
|
public void incrECReconstructionBytesWritten(long bytes) {
|
||||||
ecReconstructionBytesWritten.incr(bytes);
|
ecReconstructionBytesWritten.incr(bytes);
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounterWithoutCheck;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -98,6 +99,8 @@ public void testFullBlock() throws Exception {
|
|||||||
blockGroupSize, getLongMetric("EcReconstructionBytesRead"));
|
blockGroupSize, getLongMetric("EcReconstructionBytesRead"));
|
||||||
Assert.assertEquals("EcReconstructionBytesWritten should be ",
|
Assert.assertEquals("EcReconstructionBytesWritten should be ",
|
||||||
blockSize, getLongMetric("EcReconstructionBytesWritten"));
|
blockSize, getLongMetric("EcReconstructionBytesWritten"));
|
||||||
|
Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
|
||||||
|
0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// A partial block, reconstruct the partial block
|
// A partial block, reconstruct the partial block
|
||||||
@ -110,6 +113,8 @@ public void testReconstructionBytesPartialGroup1() throws Exception {
|
|||||||
fileLen, getLongMetric("EcReconstructionBytesRead"));
|
fileLen, getLongMetric("EcReconstructionBytesRead"));
|
||||||
Assert.assertEquals("EcReconstructionBytesWritten should be ",
|
Assert.assertEquals("EcReconstructionBytesWritten should be ",
|
||||||
fileLen, getLongMetric("EcReconstructionBytesWritten"));
|
fileLen, getLongMetric("EcReconstructionBytesWritten"));
|
||||||
|
Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
|
||||||
|
0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1 full block + 5 partial block, reconstruct the full block
|
// 1 full block + 5 partial block, reconstruct the full block
|
||||||
@ -121,8 +126,10 @@ public void testReconstructionBytesPartialGroup2() throws Exception {
|
|||||||
Assert.assertEquals("ecReconstructionBytesRead should be ",
|
Assert.assertEquals("ecReconstructionBytesRead should be ",
|
||||||
cellSize * dataBlocks + cellSize + cellSize / 10,
|
cellSize * dataBlocks + cellSize + cellSize / 10,
|
||||||
getLongMetric("EcReconstructionBytesRead"));
|
getLongMetric("EcReconstructionBytesRead"));
|
||||||
Assert.assertEquals("ecReconstructionBytesWritten should be ",
|
Assert.assertEquals("EcReconstructionBytesWritten should be ",
|
||||||
blockSize, getLongMetric("EcReconstructionBytesWritten"));
|
blockSize, getLongMetric("EcReconstructionBytesWritten"));
|
||||||
|
Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
|
||||||
|
0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1 full block + 5 partial block, reconstruct the partial block
|
// 1 full block + 5 partial block, reconstruct the partial block
|
||||||
@ -137,6 +144,8 @@ public void testReconstructionBytesPartialGroup3() throws Exception {
|
|||||||
Assert.assertEquals("ecReconstructionBytesWritten should be ",
|
Assert.assertEquals("ecReconstructionBytesWritten should be ",
|
||||||
cellSize + cellSize / 10,
|
cellSize + cellSize / 10,
|
||||||
getLongMetric("EcReconstructionBytesWritten"));
|
getLongMetric("EcReconstructionBytesWritten"));
|
||||||
|
Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
|
||||||
|
0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getLongMetric(String metricName) {
|
private long getLongMetric(String metricName) {
|
||||||
@ -149,6 +158,16 @@ private long getLongMetric(String metricName) {
|
|||||||
return metricValue;
|
return metricValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getLongMetricWithoutCheck(String metricName) {
|
||||||
|
long metricValue = 0;
|
||||||
|
// Add all reconstruction metric value from all data nodes
|
||||||
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
|
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
|
||||||
|
metricValue += getLongCounterWithoutCheck(metricName, rb);
|
||||||
|
}
|
||||||
|
return metricValue;
|
||||||
|
}
|
||||||
|
|
||||||
private void doTest(String fileName, int fileLen,
|
private void doTest(String fileName, int fileLen,
|
||||||
int deadNodeIndex) throws Exception {
|
int deadNodeIndex) throws Exception {
|
||||||
assertTrue(fileLen > 0);
|
assertTrue(fileLen > 0);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user