HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang)

(cherry picked from commit 86cad007d7)
This commit is contained in:
Andrew Wang 2014-10-23 12:53:01 -07:00
parent d27cd5ad77
commit d3b9e9be24
5 changed files with 79 additions and 0 deletions

View File

@ -39,6 +39,8 @@ Release 2.7.0 - UNRELEASED
HDFS-6824. Additional user documentation for HDFS encryption. (wang)
HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang)
OPTIMIZATIONS
BUG FIXES

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
/**
* Used for injecting faults in DFSClient and DFSOutputStream tests.
* Calls into this are a no-op in production code.
@ -35,4 +37,6 @@ public static DataNodeFaultInjector get() {
}
public void getHdfsBlocksMetadata() {}
public void writeBlockAfterFlush() throws IOException {}
}

View File

@ -211,6 +211,7 @@ public void run() {
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
}
} else {
datanode.metrics.incrDatanodeNetworkErrors();
throw err;
}
break;
@ -500,6 +501,7 @@ public void readBlock(final ExtendedBlock block,
} catch (IOException ioe) {
LOG.debug("Error reading client status response. Will close connection.", ioe);
IOUtils.closeStream(out);
datanode.metrics.incrDatanodeNetworkErrors();
}
} else {
IOUtils.closeStream(out);
@ -520,6 +522,7 @@ public void readBlock(final ExtendedBlock block,
*/
LOG.warn(dnR + ":Got exception while serving " + block + " to "
+ remoteAddress, ioe);
datanode.metrics.incrDatanodeNetworkErrors();
throw ioe;
} finally {
IOUtils.closeStream(blockSender);
@ -657,6 +660,8 @@ public void writeBlock(final ExtendedBlock block,
mirrorOut.flush();
DataNodeFaultInjector.get().writeBlockAfterFlush();
// read connect ack (only for clients, not for replication req)
if (isClient) {
BlockOpResponseProto connectAck =
@ -695,6 +700,7 @@ public void writeBlock(final ExtendedBlock block,
LOG.info(datanode + ":Exception transfering " +
block + " to mirror " + mirrorNode +
"- continuing without the mirror", e);
datanode.metrics.incrDatanodeNetworkErrors();
}
}
}
@ -749,6 +755,7 @@ public void writeBlock(final ExtendedBlock block,
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
datanode.metrics.incrDatanodeNetworkErrors();
throw ioe;
} finally {
// close all opened streams
@ -782,6 +789,10 @@ public void transferBlock(final ExtendedBlock blk,
datanode.transferReplicaForPipelineRecovery(blk, targets,
targetStorageTypes, clientName);
writeResponse(Status.SUCCESS, null, out);
} catch (IOException ioe) {
LOG.info("transferBlock " + blk + " received exception " + ioe);
datanode.metrics.incrDatanodeNetworkErrors();
throw ioe;
} finally {
IOUtils.closeStream(out);
}
@ -873,6 +884,10 @@ public void blockChecksum(final ExtendedBlock block,
.build()
.writeDelimitedTo(out);
out.flush();
} catch (IOException ioe) {
LOG.info("blockChecksum " + block + " received exception " + ioe);
datanode.metrics.incrDatanodeNetworkErrors();
throw ioe;
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(checksumIn);
@ -938,6 +953,7 @@ public void copyBlock(final ExtendedBlock block,
} catch (IOException ioe) {
isOpSuccess = false;
LOG.info("opCopyBlock " + block + " received exception " + ioe);
datanode.metrics.incrDatanodeNetworkErrors();
throw ioe;
} finally {
dataXceiverServer.balanceThrottler.release();
@ -995,6 +1011,7 @@ public void replaceBlock(final ExtendedBlock block,
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
boolean IoeDuringCopyBlockOperation = false;
try {
// get the output stream to the proxy
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@ -1022,7 +1039,9 @@ public void replaceBlock(final ExtendedBlock block,
HdfsConstants.IO_FILE_BUFFER_SIZE));
/* send request to the proxy */
IoeDuringCopyBlockOperation = true;
new Sender(proxyOut).copyBlock(block, blockToken);
IoeDuringCopyBlockOperation = false;
// receive the response from the proxy
@ -1065,6 +1084,10 @@ public void replaceBlock(final ExtendedBlock block,
opStatus = ERROR;
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
LOG.info(errMsg);
if (!IoeDuringCopyBlockOperation) {
// Don't double count IO errors
datanode.metrics.incrDatanodeNetworkErrors();
}
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread resource
@ -1083,6 +1106,7 @@ public void replaceBlock(final ExtendedBlock block,
sendResponse(opStatus, errMsg);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
datanode.metrics.incrDatanodeNetworkErrors();
}
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);

View File

@ -89,6 +89,9 @@ public class DataNodeMetrics {
@Metric MutableCounterLong volumeFailures;
@Metric("Count of network errors on the datanode")
MutableCounterLong datanodeNetworkErrors;
@Metric MutableRate readBlockOp;
@Metric MutableRate writeBlockOp;
@Metric MutableRate blockChecksumOp;
@ -296,6 +299,10 @@ public void incrVolumeFailures() {
volumeFailures.incr();
}
public void incrDatanodeNetworkErrors() {
datanodeNetworkErrors.incr();
}
/** Increment for getBlockLocalPathInfo calls */
public void incrBlocksGetLocalPathInfo() {
blocksGetLocalPathInfo.incr();

View File

@ -25,8 +25,13 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -38,10 +43,13 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.junit.Test;
import org.mockito.Mockito;
public class TestDataNodeMetrics {
private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class);
@Test
public void testDataNodeMetrics() throws Exception {
@ -186,4 +194,38 @@ public void testRoundTripAckMetric() throws Exception {
}
}
}
@Test(timeout=60000)
public void testTimeoutMetric() throws Exception {
final Configuration conf = new HdfsConfiguration();
final Path path = new Path("/test");
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final List<FSDataOutputStream> streams = Lists.newArrayList();
try {
final FSDataOutputStream out =
cluster.getFileSystem().create(path, (short) 2);
final DataNodeFaultInjector injector = Mockito.mock
(DataNodeFaultInjector.class);
Mockito.doThrow(new IOException("mock IOException")).
when(injector).
writeBlockAfterFlush();
DataNodeFaultInjector.instance = injector;
streams.add(out);
out.writeBytes("old gs data\n");
out.hflush();
final MetricsRecordBuilder dnMetrics =
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
assertCounter("DatanodeNetworkErrors", 1L, dnMetrics);
} finally {
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.instance = new DataNodeFaultInjector();
}
}
}