From d3b9e9be24073f83e101c6a82546b9aef3878689 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 23 Oct 2014 12:53:01 -0700 Subject: [PATCH] HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang) (cherry picked from commit 86cad007d7d6366b293bb9a073814889081c8662) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../datanode/DataNodeFaultInjector.java | 4 ++ .../hdfs/server/datanode/DataXceiver.java | 24 +++++++++++ .../datanode/metrics/DataNodeMetrics.java | 7 ++++ .../server/datanode/TestDataNodeMetrics.java | 42 +++++++++++++++++++ 5 files changed, 79 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b6ff34e390e..1256550576c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -39,6 +39,8 @@ Release 2.7.0 - UNRELEASED HDFS-6824. Additional user documentation for HDFS encryption. (wang) + HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 31ac80b5eda..478099deee9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; +import java.io.IOException; + /** * Used for injecting faults in DFSClient and DFSOutputStream tests. * Calls into this are a no-op in production code. @@ -35,4 +37,6 @@ public class DataNodeFaultInjector { } public void getHdfsBlocksMetadata() {} + + public void writeBlockAfterFlush() throws IOException {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 67eb9418115..6fc819de681 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -211,6 +211,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops"); } } else { + datanode.metrics.incrDatanodeNetworkErrors(); throw err; } break; @@ -500,6 +501,7 @@ class DataXceiver extends Receiver implements Runnable { } catch (IOException ioe) { LOG.debug("Error reading client status response. Will close connection.", ioe); IOUtils.closeStream(out); + datanode.metrics.incrDatanodeNetworkErrors(); } } else { IOUtils.closeStream(out); @@ -520,6 +522,7 @@ class DataXceiver extends Receiver implements Runnable { */ LOG.warn(dnR + ":Got exception while serving " + block + " to " + remoteAddress, ioe); + datanode.metrics.incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(blockSender); @@ -657,6 +660,8 @@ class DataXceiver extends Receiver implements Runnable { mirrorOut.flush(); + DataNodeFaultInjector.get().writeBlockAfterFlush(); + // read connect ack (only for clients, not for replication req) if (isClient) { BlockOpResponseProto connectAck = @@ -695,6 +700,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.info(datanode + ":Exception transfering " + block + " to mirror " + mirrorNode + "- continuing without the mirror", e); + datanode.metrics.incrDatanodeNetworkErrors(); } } } @@ -749,6 +755,7 @@ class DataXceiver extends Receiver implements Runnable { } catch (IOException ioe) { LOG.info("opWriteBlock " + block + " received exception " + ioe); + datanode.metrics.incrDatanodeNetworkErrors(); throw ioe; } finally { // close all opened streams @@ -782,6 +789,10 @@ class DataXceiver extends Receiver implements Runnable { datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); + } catch (IOException ioe) { + LOG.info("transferBlock " + blk + " received exception " + ioe); + datanode.metrics.incrDatanodeNetworkErrors(); + throw ioe; } finally { IOUtils.closeStream(out); } @@ -873,6 +884,10 @@ class DataXceiver extends Receiver implements Runnable { .build() .writeDelimitedTo(out); out.flush(); + } catch (IOException ioe) { + LOG.info("blockChecksum " + block + " received exception " + ioe); + datanode.metrics.incrDatanodeNetworkErrors(); + throw ioe; } finally { IOUtils.closeStream(out); IOUtils.closeStream(checksumIn); @@ -938,6 +953,7 @@ class DataXceiver extends Receiver implements Runnable { } catch (IOException ioe) { isOpSuccess = false; LOG.info("opCopyBlock " + block + " received exception " + ioe); + datanode.metrics.incrDatanodeNetworkErrors(); throw ioe; } finally { dataXceiverServer.balanceThrottler.release(); @@ -995,6 +1011,7 @@ class DataXceiver extends Receiver implements Runnable { BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; DataOutputStream replyOut = new DataOutputStream(getOutputStream()); + boolean IoeDuringCopyBlockOperation = false; try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -1022,7 +1039,9 @@ class DataXceiver extends Receiver implements Runnable { HdfsConstants.IO_FILE_BUFFER_SIZE)); /* send request to the proxy */ + IoeDuringCopyBlockOperation = true; new Sender(proxyOut).copyBlock(block, blockToken); + IoeDuringCopyBlockOperation = false; // receive the response from the proxy @@ -1065,6 +1084,10 @@ class DataXceiver extends Receiver implements Runnable { opStatus = ERROR; errMsg = "opReplaceBlock " + block + " received exception " + ioe; LOG.info(errMsg); + if (!IoeDuringCopyBlockOperation) { + // Don't double count IO errors + datanode.metrics.incrDatanodeNetworkErrors(); + } throw ioe; } finally { // receive the last byte that indicates the proxy released its thread resource @@ -1083,6 +1106,7 @@ class DataXceiver extends Receiver implements Runnable { sendResponse(opStatus, errMsg); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + peer.getRemoteAddressString()); + datanode.metrics.incrDatanodeNetworkErrors(); } IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 57f12db3b50..09ad3da642b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -89,6 +89,9 @@ public class DataNodeMetrics { @Metric MutableCounterLong volumeFailures; + @Metric("Count of network errors on the datanode") + MutableCounterLong datanodeNetworkErrors; + @Metric MutableRate readBlockOp; @Metric MutableRate writeBlockOp; @Metric MutableRate blockChecksumOp; @@ -296,6 +299,10 @@ public class DataNodeMetrics { volumeFailures.incr(); } + public void incrDatanodeNetworkErrors() { + datanodeNetworkErrors.incr(); + } + /** Increment for getBlockLocalPathInfo calls */ public void incrBlocksGetLocalPathInfo() { blocksGetLocalPathInfo.incr(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 9b90d41198d..90112af245e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -25,8 +25,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.Closeable; +import java.io.IOException; import java.util.List; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -38,10 +43,13 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.junit.Test; +import org.mockito.Mockito; public class TestDataNodeMetrics { + private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class); @Test public void testDataNodeMetrics() throws Exception { @@ -186,4 +194,38 @@ public class TestDataNodeMetrics { } } } + + @Test(timeout=60000) + public void testTimeoutMetric() throws Exception { + final Configuration conf = new HdfsConfiguration(); + final Path path = new Path("/test"); + + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + + final List streams = Lists.newArrayList(); + try { + final FSDataOutputStream out = + cluster.getFileSystem().create(path, (short) 2); + final DataNodeFaultInjector injector = Mockito.mock + (DataNodeFaultInjector.class); + Mockito.doThrow(new IOException("mock IOException")). + when(injector). + writeBlockAfterFlush(); + DataNodeFaultInjector.instance = injector; + streams.add(out); + out.writeBytes("old gs data\n"); + out.hflush(); + + final MetricsRecordBuilder dnMetrics = + getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + assertCounter("DatanodeNetworkErrors", 1L, dnMetrics); + } finally { + IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); + if (cluster != null) { + cluster.shutdown(); + } + DataNodeFaultInjector.instance = new DataNodeFaultInjector(); + } + } }