HDFS-8411. Add bytes count metrics to datanode for ECWorker. Contributed by Sammi Chen and Andrew Wang

This commit is contained in:
Kai Zheng 2016-12-14 14:50:50 +08:00
parent ada876cd1d
commit 1f14f6d038
9 changed files with 147 additions and 81 deletions

View File

@ -187,6 +187,7 @@ class StripedBlockReader {
break; break;
} }
n += nread; n += nread;
stripedReader.getReconstructor().incrBytesRead(nread);
} }
} }

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
/** /**
* StripedBlockReconstructor reconstruct one or more missed striped block in * StripedBlockReconstructor reconstruct one or more missed striped block in
@ -66,7 +67,10 @@ class StripedBlockReconstructor extends StripedReconstructor
getDatanode().getMetrics().incrECFailedReconstructionTasks(); getDatanode().getMetrics().incrECFailedReconstructionTasks();
} finally { } finally {
getDatanode().decrementXmitsInProgress(); getDatanode().decrementXmitsInProgress();
getDatanode().getMetrics().incrECReconstructionTasks(); final DataNodeMetrics metrics = getDatanode().getMetrics();
metrics.incrECReconstructionTasks();
metrics.incrECReconstructionBytesRead(getBytesRead());
metrics.incrECReconstructionBytesWritten(getBytesWritten());
getStripedReader().close(); getStripedReader().close();
stripedWriter.close(); stripedWriter.close();
} }

View File

@ -196,6 +196,7 @@ class StripedBlockWriter {
packet.writeTo(targetOutputStream); packet.writeTo(targetOutputStream);
blockOffset4Target += toWrite; blockOffset4Target += toWrite;
stripedWriter.getReconstructor().incrBytesWritten(toWrite);
} }
} }

View File

@ -435,6 +435,10 @@ class StripedReader {
} }
} }
StripedReconstructor getReconstructor() {
return reconstructor;
}
StripedBlockReader getReader(int i) { StripedBlockReader getReader(int i) {
return readers.get(i); return readers.get(i);
} }

View File

@ -41,6 +41,7 @@ import java.util.BitSet;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* StripedReconstructor reconstruct one or more missed striped block in the * StripedReconstructor reconstruct one or more missed striped block in the
@ -114,6 +115,10 @@ abstract class StripedReconstructor {
private long maxTargetLength = 0L; private long maxTargetLength = 0L;
private final BitSet liveBitSet; private final BitSet liveBitSet;
// metrics
private AtomicLong bytesRead = new AtomicLong(0);
private AtomicLong bytesWritten = new AtomicLong(0);
StripedReconstructor(ErasureCodingWorker worker, StripedReconstructor(ErasureCodingWorker worker,
StripedReconstructionInfo stripedReconInfo) { StripedReconstructionInfo stripedReconInfo) {
this.stripedReadPool = worker.getStripedReadPool(); this.stripedReadPool = worker.getStripedReadPool();
@ -133,6 +138,22 @@ abstract class StripedReconstructor {
positionInBlock = 0L; positionInBlock = 0L;
} }
public void incrBytesRead(long delta) {
bytesRead.addAndGet(delta);
}
public void incrBytesWritten(long delta) {
bytesWritten.addAndGet(delta);
}
public long getBytesRead() {
return bytesRead.get();
}
public long getBytesWritten() {
return bytesWritten.get();
}
/** /**
* Reconstruct one or more missed striped block in the striped block group, * Reconstruct one or more missed striped block in the striped block group,
* the minimum number of live striped blocks should be no less than data * the minimum number of live striped blocks should be no less than data

View File

@ -280,6 +280,10 @@ class StripedWriter {
return reconstructor.getSocketAddress4Transfer(target); return reconstructor.getSocketAddress4Transfer(target);
} }
StripedReconstructor getReconstructor() {
return reconstructor;
}
boolean hasValidTargets() { boolean hasValidTargets() {
return hasValidTargets; return hasValidTargets;
} }

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode.metrics; package org.apache.hadoop.hdfs.server.datanode.metrics;
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -135,8 +134,12 @@ public class DataNodeMetrics {
MutableCounterLong ecReconstructionTasks; MutableCounterLong ecReconstructionTasks;
@Metric("Count of erasure coding failed reconstruction tasks") @Metric("Count of erasure coding failed reconstruction tasks")
MutableCounterLong ecFailedReconstructionTasks; MutableCounterLong ecFailedReconstructionTasks;
// Nanoseconds spent by decoding tasks. @Metric("Nanoseconds spent by decoding tasks")
MutableCounterLong ecDecodingTimeNanos; MutableCounterLong ecDecodingTimeNanos;
@Metric("Bytes read by erasure coding worker")
MutableCounterLong ecReconstructionBytesRead;
@Metric("Bytes written by erasure coding worker")
MutableCounterLong ecReconstructionBytesWritten;
final MetricsRegistry registry = new MetricsRegistry("datanode"); final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name; final String name;
@ -156,9 +159,6 @@ public class DataNodeMetrics {
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len]; sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len]; ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len]; ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
ecDecodingTimeNanos = registry.newCounter(
info("ecDecodingTimeNanos", "Nanoseconds spent by decoding tasks"),
(long) 0);
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
int interval = intervals[i]; int interval = intervals[i];
@ -454,4 +454,12 @@ public class DataNodeMetrics {
public void incrECDecodingTime(long decodingTimeNanos) { public void incrECDecodingTime(long decodingTimeNanos) {
ecDecodingTimeNanos.incr(decodingTimeNanos); ecDecodingTimeNanos.incr(decodingTimeNanos);
} }
public void incrECReconstructionBytesRead(long bytes) {
ecReconstructionBytesRead.incr(bytes);
}
public void incrECReconstructionBytesWritten(long bytes) {
ecReconstructionBytesWritten.incr(bytes);
}
} }

View File

@ -526,6 +526,30 @@ public class StripedFileTestUtil {
throw new IOException("Time out waiting for EC block reconstruction."); throw new IOException("Time out waiting for EC block reconstruction.");
} }
/**
* Wait for the reconstruction to be finished when the file has
* corrupted blocks. The function can take care file with any length.
*/
public static void waitForAllReconstructionFinished(Path file,
DistributedFileSystem fs, long expectedBlocks) throws Exception {
LOG.info("Waiting for reconstruction to be finished for the file:" + file
+ ", expectedBlocks:" + expectedBlocks);
final int attempts = 60;
for (int i = 0; i < attempts; i++) {
int totalBlocks = 0;
LocatedBlocks locatedBlocks = getLocatedBlocks(file, fs);
for (LocatedBlock locatedBlock: locatedBlocks.getLocatedBlocks()) {
DatanodeInfo[] storageInfos = locatedBlock.getLocations();
totalBlocks += storageInfos.length;
}
if (totalBlocks >= expectedBlocks) {
return;
}
Thread.sleep(1000);
}
throw new IOException("Time out waiting for EC block reconstruction.");
}
/** /**
* Get the located blocks of a file. * Get the located blocks of a file.
*/ */

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -28,7 +27,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@ -38,21 +36,16 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
/** /**
* This file tests the erasure coding metrics in DataNode. * This file tests the erasure coding metrics in DataNode.
@ -65,8 +58,9 @@ public class TestDataNodeErasureCodingMetrics {
private final int dataBlocks = ecPolicy.getNumDataUnits(); private final int dataBlocks = ecPolicy.getNumDataUnits();
private final int parityBlocks = ecPolicy.getNumParityUnits(); private final int parityBlocks = ecPolicy.getNumParityUnits();
private final int cellSize = ecPolicy.getCellSize(); private final int cellSize = ecPolicy.getCellSize();
private final int blockSize = cellSize; private final int blockSize = cellSize * 2;
private final int groupSize = dataBlocks + parityBlocks; private final int groupSize = dataBlocks + parityBlocks;
private final int blockGroupSize = blockSize * dataBlocks;
private final int numDNs = groupSize + 1; private final int numDNs = groupSize + 1;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
@ -76,7 +70,6 @@ public class TestDataNodeErasureCodingMetrics {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
conf = new Configuration(); conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
@ -93,82 +86,86 @@ public class TestDataNodeErasureCodingMetrics {
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testEcTasks() throws Exception { public void testFullBlock() throws Exception {
DataNode workerDn = doTest("/testEcTasks"); doTest("/testEcMetrics", blockGroupSize, 0);
MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name());
// Ensure that reconstruction task is finished Assert.assertEquals("EcReconstructionTasks should be ",
GenericTestUtils.waitFor(new Supplier<Boolean>() { 1, getLongMetric("EcReconstructionTasks"));
@Override Assert.assertEquals("EcFailedReconstructionTasks should be ",
public Boolean get() { 0, getLongMetric("EcFailedReconstructionTasks"));
long taskMetricValue = getLongCounter("EcReconstructionTasks", rb); Assert.assertTrue(getLongMetric("EcDecodingTimeNanos") > 0);
return (taskMetricValue > 0); Assert.assertEquals("EcReconstructionBytesRead should be ",
} blockGroupSize, getLongMetric("EcReconstructionBytesRead"));
}, 500, 10000); Assert.assertEquals("EcReconstructionBytesWritten should be ",
blockSize, getLongMetric("EcReconstructionBytesWritten"));
assertCounter("EcReconstructionTasks", (long) 1, rb);
assertCounter("EcFailedReconstructionTasks", (long) 0, rb);
} }
// A partial block, reconstruct the partial block
@Test(timeout = 120000) @Test(timeout = 120000)
public void testEcCodingTime() throws Exception { public void testReconstructionBytesPartialGroup1() throws Exception {
DataNode workerDn = doTest("/testEcCodingTime"); final int fileLen = blockSize / 10;
MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name()); doTest("/testEcBytes", fileLen, 0);
// Ensure that reconstruction task is finished Assert.assertEquals("EcReconstructionBytesRead should be ",
GenericTestUtils.waitFor(new Supplier<Boolean>() { fileLen, getLongMetric("EcReconstructionBytesRead"));
@Override Assert.assertEquals("EcReconstructionBytesWritten should be ",
public Boolean get() { fileLen, getLongMetric("EcReconstructionBytesWritten"));
long taskMetricValue = getLongCounter("EcReconstructionTasks", rb);
return (taskMetricValue > 0);
}
}, 500, 10000);
long decodeTime = getLongCounter("ecDecodingTimeNanos", rb);
Assert.assertTrue(decodeTime > 0);
} }
private DataNode doTest(String fileName) throws Exception { // 1 full block + 5 partial block, reconstruct the full block
@Test(timeout = 120000)
public void testReconstructionBytesPartialGroup2() throws Exception {
final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10;
doTest("/testEcBytes", fileLen, 0);
Assert.assertEquals("ecReconstructionBytesRead should be ",
cellSize * dataBlocks + cellSize + cellSize / 10,
getLongMetric("EcReconstructionBytesRead"));
Assert.assertEquals("ecReconstructionBytesWritten should be ",
blockSize, getLongMetric("EcReconstructionBytesWritten"));
}
// 1 full block + 5 partial block, reconstruct the partial block
@Test(timeout = 120000)
public void testReconstructionBytesPartialGroup3() throws Exception {
final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10;
doTest("/testEcBytes", fileLen, 1);
Assert.assertEquals("ecReconstructionBytesRead should be ",
cellSize * dataBlocks + (cellSize / 10) * 2 ,
getLongMetric("EcReconstructionBytesRead"));
Assert.assertEquals("ecReconstructionBytesWritten should be ",
cellSize + cellSize / 10,
getLongMetric("EcReconstructionBytesWritten"));
}
private long getLongMetric(String metricName) {
long metricValue = 0;
// Add all reconstruction metric value from all data nodes
for (DataNode dn : cluster.getDataNodes()) {
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
metricValue += getLongCounter(metricName, rb);
}
return metricValue;
}
private void doTest(String fileName, int fileLen,
int deadNodeIndex) throws Exception {
assertTrue(fileLen > 0);
assertTrue(deadNodeIndex >= 0 && deadNodeIndex < numDNs);
Path file = new Path(fileName); Path file = new Path(fileName);
long fileLen = dataBlocks * blockSize; final byte[] data = StripedFileTestUtil.generateBytes(fileLen);
final byte[] data = StripedFileTestUtil.generateBytes((int) fileLen);
DFSTestUtil.writeFile(fs, file, data); DFSTestUtil.writeFile(fs, file, data);
StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
LocatedBlocks locatedBlocks = final LocatedBlocks locatedBlocks =
StripedFileTestUtil.getLocatedBlocks(file, fs); StripedFileTestUtil.getLocatedBlocks(file, fs);
//only one block group final LocatedStripedBlock lastBlock =
LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
DataNode workerDn = null; assertTrue(lastBlock.getLocations().length > deadNodeIndex);
DatanodeInfo[] locations = lastBlock.getLocations();
assertEquals(locations.length, groupSize);
// we have ONE extra datanode in addition to the GROUPSIZE datanodes, here final DataNode toCorruptDn = cluster.getDataNode(
// is to find the extra datanode that the reconstruction task will run on, lastBlock.getLocations()[deadNodeIndex].getIpcPort());
// according to the current block placement logic for striped files.
// This can be improved later to be flexible regardless wherever the task
// runs.
for (DataNode dn : cluster.getDataNodes()) {
boolean appear = false;
for (DatanodeInfo info : locations) {
if (dn.getDatanodeUuid().equals(info.getDatanodeUuid())) {
appear = true;
break;
}
}
if (!appear) {
workerDn = dn;
break;
}
}
// Get a datanode from the block locations.
LOG.info("Block locations: " + Arrays.asList(locations));
LOG.info("Erasure coding worker datanode: " + workerDn);
assertNotNull("Failed to find a worker datanode", workerDn);
DataNode toCorruptDn = cluster.getDataNode(locations[0].getIpcPort());
LOG.info("Datanode to be corrupted: " + toCorruptDn); LOG.info("Datanode to be corrupted: " + toCorruptDn);
assertNotNull("Failed to find a datanode to be corrupted", toCorruptDn); assertNotNull("Failed to find a datanode to be corrupted", toCorruptDn);
toCorruptDn.shutdown(); toCorruptDn.shutdown();
@ -176,12 +173,15 @@ public class TestDataNodeErasureCodingMetrics {
DFSTestUtil.waitForDatanodeState(cluster, toCorruptDn.getDatanodeUuid(), DFSTestUtil.waitForDatanodeState(cluster, toCorruptDn.getDatanodeUuid(),
false, 10000); false, 10000);
int workCount = getComputedDatanodeWork(); final int workCount = getComputedDatanodeWork();
assertTrue("Wrongly computed block reconstruction work", workCount > 0); assertTrue("Wrongly computed block reconstruction work", workCount > 0);
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize); int totalBlocks = (fileLen / blockGroupSize) * groupSize;
final int remainder = fileLen % blockGroupSize;
return workerDn; totalBlocks += (remainder == 0) ? 0 :
(remainder % blockSize == 0) ? remainder / blockSize + parityBlocks :
remainder / blockSize + 1 + parityBlocks;
StripedFileTestUtil.waitForAllReconstructionFinished(file, fs, totalBlocks);
} }
private int getComputedDatanodeWork() private int getComputedDatanodeWork()
@ -209,5 +209,4 @@ public class TestDataNodeErasureCodingMetrics {
BlockManagerTestUtil.checkHeartbeat( BlockManagerTestUtil.checkHeartbeat(
cluster.getNamesystem().getBlockManager()); cluster.getNamesystem().getBlockManager());
} }
} }