diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index a27de9bc2e3..0f7c5c77e19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -187,6 +187,7 @@ class StripedBlockReader { break; } n += nread; + stripedReader.getReconstructor().incrBytesRead(nread); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index a8e9d308159..5554d6863e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; /** * StripedBlockReconstructor reconstruct one or more missed striped block in @@ -66,7 +67,10 @@ class StripedBlockReconstructor extends StripedReconstructor getDatanode().getMetrics().incrECFailedReconstructionTasks(); } finally { getDatanode().decrementXmitsInProgress(); - getDatanode().getMetrics().incrECReconstructionTasks(); + final DataNodeMetrics metrics = getDatanode().getMetrics(); + metrics.incrECReconstructionTasks(); + metrics.incrECReconstructionBytesRead(getBytesRead()); + metrics.incrECReconstructionBytesWritten(getBytesWritten()); getStripedReader().close(); stripedWriter.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java index 592be45f9fc..d999202fa88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java @@ -196,6 +196,7 @@ class StripedBlockWriter { packet.writeTo(targetOutputStream); blockOffset4Target += toWrite; + stripedWriter.getReconstructor().incrBytesWritten(toWrite); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index 238c62838be..f6f343a6bf3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -435,6 +435,10 @@ class StripedReader { } } + StripedReconstructor getReconstructor() { + return reconstructor; + } + StripedBlockReader getReader(int i) { return readers.get(i); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 5641c35045e..68769f786cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -41,6 +41,7 @@ import java.util.BitSet; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; /** * StripedReconstructor reconstruct one or more missed striped block in the @@ -114,6 +115,10 @@ abstract class StripedReconstructor { private long maxTargetLength = 0L; private final BitSet liveBitSet; + // metrics + private AtomicLong bytesRead = new AtomicLong(0); + private AtomicLong bytesWritten = new AtomicLong(0); + StripedReconstructor(ErasureCodingWorker worker, StripedReconstructionInfo stripedReconInfo) { this.stripedReadPool = worker.getStripedReadPool(); @@ -133,6 +138,22 @@ abstract class StripedReconstructor { positionInBlock = 0L; } + public void incrBytesRead(long delta) { + bytesRead.addAndGet(delta); + } + + public void incrBytesWritten(long delta) { + bytesWritten.addAndGet(delta); + } + + public long getBytesRead() { + return bytesRead.get(); + } + + public long getBytesWritten() { + return bytesWritten.get(); + } + /** * Reconstruct one or more missed striped block in the striped block group, * the minimum number of live striped blocks should be no less than data diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java index c099bc1dbd4..225a7ed1cc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java @@ -280,6 +280,10 @@ class StripedWriter { return reconstructor.getSocketAddress4Transfer(target); } + StripedReconstructor getReconstructor() { + return reconstructor; + } + boolean hasValidTargets() { return hasValidTargets; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 23e15a2b66c..e09a85f8ebd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode.metrics; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; -import static org.apache.hadoop.metrics2.lib.Interns.info; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -135,8 +134,12 @@ public class DataNodeMetrics { MutableCounterLong ecReconstructionTasks; @Metric("Count of erasure coding failed reconstruction tasks") MutableCounterLong ecFailedReconstructionTasks; - // Nanoseconds spent by decoding tasks. + @Metric("Nanoseconds spent by decoding tasks") MutableCounterLong ecDecodingTimeNanos; + @Metric("Bytes read by erasure coding worker") + MutableCounterLong ecReconstructionBytesRead; + @Metric("Bytes written by erasure coding worker") + MutableCounterLong ecReconstructionBytesWritten; final MetricsRegistry registry = new MetricsRegistry("datanode"); final String name; @@ -156,9 +159,6 @@ public class DataNodeMetrics { sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len]; ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len]; ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len]; - ecDecodingTimeNanos = registry.newCounter( - info("ecDecodingTimeNanos", "Nanoseconds spent by decoding tasks"), - (long) 0); for (int i = 0; i < len; i++) { int interval = intervals[i]; @@ -454,4 +454,12 @@ public class DataNodeMetrics { public void incrECDecodingTime(long decodingTimeNanos) { ecDecodingTimeNanos.incr(decodingTimeNanos); } + + public void incrECReconstructionBytesRead(long bytes) { + ecReconstructionBytesRead.incr(bytes); + } + + public void incrECReconstructionBytesWritten(long bytes) { + ecReconstructionBytesWritten.incr(bytes); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 311ba7c8cfa..520d0e3134a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -526,6 +526,30 @@ public class StripedFileTestUtil { throw new IOException("Time out waiting for EC block reconstruction."); } + /** + * Wait for the reconstruction to be finished when the file has + * corrupted blocks. The function can take care file with any length. + */ + public static void waitForAllReconstructionFinished(Path file, + DistributedFileSystem fs, long expectedBlocks) throws Exception { + LOG.info("Waiting for reconstruction to be finished for the file:" + file + + ", expectedBlocks:" + expectedBlocks); + final int attempts = 60; + for (int i = 0; i < attempts; i++) { + int totalBlocks = 0; + LocatedBlocks locatedBlocks = getLocatedBlocks(file, fs); + for (LocatedBlock locatedBlock: locatedBlocks.getLocatedBlocks()) { + DatanodeInfo[] storageInfos = locatedBlock.getLocations(); + totalBlocks += storageInfos.length; + } + if (totalBlocks >= expectedBlocks) { + return; + } + Thread.sleep(1000); + } + throw new IOException("Time out waiting for EC block reconstruction."); + } + /** * Get the located blocks of a file. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java index 1b0526b2ee4..64ddbd7a2f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,7 +27,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -38,21 +36,16 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; - import java.io.IOException; -import java.util.Arrays; /** * This file tests the erasure coding metrics in DataNode. @@ -65,8 +58,9 @@ public class TestDataNodeErasureCodingMetrics { private final int dataBlocks = ecPolicy.getNumDataUnits(); private final int parityBlocks = ecPolicy.getNumParityUnits(); private final int cellSize = ecPolicy.getCellSize(); - private final int blockSize = cellSize; + private final int blockSize = cellSize * 2; private final int groupSize = dataBlocks + parityBlocks; + private final int blockGroupSize = blockSize * dataBlocks; private final int numDNs = groupSize + 1; private MiniDFSCluster cluster; @@ -76,7 +70,6 @@ public class TestDataNodeErasureCodingMetrics { @Before public void setup() throws IOException { conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); @@ -93,82 +86,86 @@ public class TestDataNodeErasureCodingMetrics { } @Test(timeout = 120000) - public void testEcTasks() throws Exception { - DataNode workerDn = doTest("/testEcTasks"); - MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name()); + public void testFullBlock() throws Exception { + doTest("/testEcMetrics", blockGroupSize, 0); - // Ensure that reconstruction task is finished - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - long taskMetricValue = getLongCounter("EcReconstructionTasks", rb); - return (taskMetricValue > 0); - } - }, 500, 10000); - - assertCounter("EcReconstructionTasks", (long) 1, rb); - assertCounter("EcFailedReconstructionTasks", (long) 0, rb); + Assert.assertEquals("EcReconstructionTasks should be ", + 1, getLongMetric("EcReconstructionTasks")); + Assert.assertEquals("EcFailedReconstructionTasks should be ", + 0, getLongMetric("EcFailedReconstructionTasks")); + Assert.assertTrue(getLongMetric("EcDecodingTimeNanos") > 0); + Assert.assertEquals("EcReconstructionBytesRead should be ", + blockGroupSize, getLongMetric("EcReconstructionBytesRead")); + Assert.assertEquals("EcReconstructionBytesWritten should be ", + blockSize, getLongMetric("EcReconstructionBytesWritten")); } + // A partial block, reconstruct the partial block @Test(timeout = 120000) - public void testEcCodingTime() throws Exception { - DataNode workerDn = doTest("/testEcCodingTime"); - MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name()); + public void testReconstructionBytesPartialGroup1() throws Exception { + final int fileLen = blockSize / 10; + doTest("/testEcBytes", fileLen, 0); - // Ensure that reconstruction task is finished - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - long taskMetricValue = getLongCounter("EcReconstructionTasks", rb); - return (taskMetricValue > 0); - } - }, 500, 10000); - - long decodeTime = getLongCounter("ecDecodingTimeNanos", rb); - Assert.assertTrue(decodeTime > 0); + Assert.assertEquals("EcReconstructionBytesRead should be ", + fileLen, getLongMetric("EcReconstructionBytesRead")); + Assert.assertEquals("EcReconstructionBytesWritten should be ", + fileLen, getLongMetric("EcReconstructionBytesWritten")); } - private DataNode doTest(String fileName) throws Exception { + // 1 full block + 5 partial block, reconstruct the full block + @Test(timeout = 120000) + public void testReconstructionBytesPartialGroup2() throws Exception { + final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10; + doTest("/testEcBytes", fileLen, 0); + Assert.assertEquals("ecReconstructionBytesRead should be ", + cellSize * dataBlocks + cellSize + cellSize / 10, + getLongMetric("EcReconstructionBytesRead")); + Assert.assertEquals("ecReconstructionBytesWritten should be ", + blockSize, getLongMetric("EcReconstructionBytesWritten")); + } + + // 1 full block + 5 partial block, reconstruct the partial block + @Test(timeout = 120000) + public void testReconstructionBytesPartialGroup3() throws Exception { + final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10; + doTest("/testEcBytes", fileLen, 1); + + Assert.assertEquals("ecReconstructionBytesRead should be ", + cellSize * dataBlocks + (cellSize / 10) * 2 , + getLongMetric("EcReconstructionBytesRead")); + Assert.assertEquals("ecReconstructionBytesWritten should be ", + cellSize + cellSize / 10, + getLongMetric("EcReconstructionBytesWritten")); + } + + private long getLongMetric(String metricName) { + long metricValue = 0; + // Add all reconstruction metric value from all data nodes + for (DataNode dn : cluster.getDataNodes()) { + MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); + metricValue += getLongCounter(metricName, rb); + } + return metricValue; + } + + private void doTest(String fileName, int fileLen, + int deadNodeIndex) throws Exception { + assertTrue(fileLen > 0); + assertTrue(deadNodeIndex >= 0 && deadNodeIndex < numDNs); Path file = new Path(fileName); - long fileLen = dataBlocks * blockSize; - final byte[] data = StripedFileTestUtil.generateBytes((int) fileLen); + final byte[] data = StripedFileTestUtil.generateBytes(fileLen); DFSTestUtil.writeFile(fs, file, data); StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); - LocatedBlocks locatedBlocks = + final LocatedBlocks locatedBlocks = StripedFileTestUtil.getLocatedBlocks(file, fs); - //only one block group - LocatedStripedBlock lastBlock = + final LocatedStripedBlock lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); - DataNode workerDn = null; - DatanodeInfo[] locations = lastBlock.getLocations(); - assertEquals(locations.length, groupSize); + assertTrue(lastBlock.getLocations().length > deadNodeIndex); - // we have ONE extra datanode in addition to the GROUPSIZE datanodes, here - // is to find the extra datanode that the reconstruction task will run on, - // according to the current block placement logic for striped files. - // This can be improved later to be flexible regardless wherever the task - // runs. - for (DataNode dn : cluster.getDataNodes()) { - boolean appear = false; - for (DatanodeInfo info : locations) { - if (dn.getDatanodeUuid().equals(info.getDatanodeUuid())) { - appear = true; - break; - } - } - if (!appear) { - workerDn = dn; - break; - } - } - // Get a datanode from the block locations. - LOG.info("Block locations: " + Arrays.asList(locations)); - LOG.info("Erasure coding worker datanode: " + workerDn); - assertNotNull("Failed to find a worker datanode", workerDn); - - DataNode toCorruptDn = cluster.getDataNode(locations[0].getIpcPort()); + final DataNode toCorruptDn = cluster.getDataNode( + lastBlock.getLocations()[deadNodeIndex].getIpcPort()); LOG.info("Datanode to be corrupted: " + toCorruptDn); assertNotNull("Failed to find a datanode to be corrupted", toCorruptDn); toCorruptDn.shutdown(); @@ -176,12 +173,15 @@ public class TestDataNodeErasureCodingMetrics { DFSTestUtil.waitForDatanodeState(cluster, toCorruptDn.getDatanodeUuid(), false, 10000); - int workCount = getComputedDatanodeWork(); + final int workCount = getComputedDatanodeWork(); assertTrue("Wrongly computed block reconstruction work", workCount > 0); cluster.triggerHeartbeats(); - StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize); - - return workerDn; + int totalBlocks = (fileLen / blockGroupSize) * groupSize; + final int remainder = fileLen % blockGroupSize; + totalBlocks += (remainder == 0) ? 0 : + (remainder % blockSize == 0) ? remainder / blockSize + parityBlocks : + remainder / blockSize + 1 + parityBlocks; + StripedFileTestUtil.waitForAllReconstructionFinished(file, fs, totalBlocks); } private int getComputedDatanodeWork() @@ -209,5 +209,4 @@ public class TestDataNodeErasureCodingMetrics { BlockManagerTestUtil.checkHeartbeat( cluster.getNamesystem().getBlockManager()); } - }