HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang)
This commit is contained in:
parent
8c5b23b547
commit
86cad007d7
|
@ -295,6 +295,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7165. Separate block metrics for files with replication count 1.
|
HDFS-7165. Separate block metrics for files with replication count 1.
|
||||||
(Zhe Zhang via wang)
|
(Zhe Zhang via wang)
|
||||||
|
|
||||||
|
HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used for injecting faults in DFSClient and DFSOutputStream tests.
|
* Used for injecting faults in DFSClient and DFSOutputStream tests.
|
||||||
* Calls into this are a no-op in production code.
|
* Calls into this are a no-op in production code.
|
||||||
|
@ -35,4 +37,6 @@ public class DataNodeFaultInjector {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void getHdfsBlocksMetadata() {}
|
public void getHdfsBlocksMetadata() {}
|
||||||
|
|
||||||
|
public void writeBlockAfterFlush() throws IOException {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,6 +211,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
|
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -500,6 +501,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.debug("Error reading client status response. Will close connection.", ioe);
|
LOG.debug("Error reading client status response. Will close connection.", ioe);
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
|
@ -520,6 +522,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
*/
|
*/
|
||||||
LOG.warn(dnR + ":Got exception while serving " + block + " to "
|
LOG.warn(dnR + ":Got exception while serving " + block + " to "
|
||||||
+ remoteAddress, ioe);
|
+ remoteAddress, ioe);
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(blockSender);
|
IOUtils.closeStream(blockSender);
|
||||||
|
@ -657,6 +660,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
|
|
||||||
mirrorOut.flush();
|
mirrorOut.flush();
|
||||||
|
|
||||||
|
DataNodeFaultInjector.get().writeBlockAfterFlush();
|
||||||
|
|
||||||
// read connect ack (only for clients, not for replication req)
|
// read connect ack (only for clients, not for replication req)
|
||||||
if (isClient) {
|
if (isClient) {
|
||||||
BlockOpResponseProto connectAck =
|
BlockOpResponseProto connectAck =
|
||||||
|
@ -695,6 +700,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
LOG.info(datanode + ":Exception transfering " +
|
LOG.info(datanode + ":Exception transfering " +
|
||||||
block + " to mirror " + mirrorNode +
|
block + " to mirror " + mirrorNode +
|
||||||
"- continuing without the mirror", e);
|
"- continuing without the mirror", e);
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -749,6 +755,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("opWriteBlock " + block + " received exception " + ioe);
|
LOG.info("opWriteBlock " + block + " received exception " + ioe);
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
// close all opened streams
|
// close all opened streams
|
||||||
|
@ -782,6 +789,10 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
datanode.transferReplicaForPipelineRecovery(blk, targets,
|
datanode.transferReplicaForPipelineRecovery(blk, targets,
|
||||||
targetStorageTypes, clientName);
|
targetStorageTypes, clientName);
|
||||||
writeResponse(Status.SUCCESS, null, out);
|
writeResponse(Status.SUCCESS, null, out);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.info("transferBlock " + blk + " received exception " + ioe);
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
}
|
}
|
||||||
|
@ -873,6 +884,10 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
.build()
|
.build()
|
||||||
.writeDelimitedTo(out);
|
.writeDelimitedTo(out);
|
||||||
out.flush();
|
out.flush();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.info("blockChecksum " + block + " received exception " + ioe);
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
IOUtils.closeStream(checksumIn);
|
IOUtils.closeStream(checksumIn);
|
||||||
|
@ -938,6 +953,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
isOpSuccess = false;
|
isOpSuccess = false;
|
||||||
LOG.info("opCopyBlock " + block + " received exception " + ioe);
|
LOG.info("opCopyBlock " + block + " received exception " + ioe);
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
dataXceiverServer.balanceThrottler.release();
|
dataXceiverServer.balanceThrottler.release();
|
||||||
|
@ -995,6 +1011,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
BlockReceiver blockReceiver = null;
|
BlockReceiver blockReceiver = null;
|
||||||
DataInputStream proxyReply = null;
|
DataInputStream proxyReply = null;
|
||||||
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
|
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
|
||||||
|
boolean IoeDuringCopyBlockOperation = false;
|
||||||
try {
|
try {
|
||||||
// get the output stream to the proxy
|
// get the output stream to the proxy
|
||||||
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
|
||||||
|
@ -1022,7 +1039,9 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
HdfsConstants.IO_FILE_BUFFER_SIZE));
|
HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||||
|
|
||||||
/* send request to the proxy */
|
/* send request to the proxy */
|
||||||
|
IoeDuringCopyBlockOperation = true;
|
||||||
new Sender(proxyOut).copyBlock(block, blockToken);
|
new Sender(proxyOut).copyBlock(block, blockToken);
|
||||||
|
IoeDuringCopyBlockOperation = false;
|
||||||
|
|
||||||
// receive the response from the proxy
|
// receive the response from the proxy
|
||||||
|
|
||||||
|
@ -1065,6 +1084,10 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
opStatus = ERROR;
|
opStatus = ERROR;
|
||||||
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
|
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
|
||||||
LOG.info(errMsg);
|
LOG.info(errMsg);
|
||||||
|
if (!IoeDuringCopyBlockOperation) {
|
||||||
|
// Don't double count IO errors
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
|
}
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
// receive the last byte that indicates the proxy released its thread resource
|
// receive the last byte that indicates the proxy released its thread resource
|
||||||
|
@ -1083,6 +1106,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
sendResponse(opStatus, errMsg);
|
sendResponse(opStatus, errMsg);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
|
LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
|
||||||
|
datanode.metrics.incrDatanodeNetworkErrors();
|
||||||
}
|
}
|
||||||
IOUtils.closeStream(proxyOut);
|
IOUtils.closeStream(proxyOut);
|
||||||
IOUtils.closeStream(blockReceiver);
|
IOUtils.closeStream(blockReceiver);
|
||||||
|
|
|
@ -89,6 +89,9 @@ public class DataNodeMetrics {
|
||||||
|
|
||||||
@Metric MutableCounterLong volumeFailures;
|
@Metric MutableCounterLong volumeFailures;
|
||||||
|
|
||||||
|
@Metric("Count of network errors on the datanode")
|
||||||
|
MutableCounterLong datanodeNetworkErrors;
|
||||||
|
|
||||||
@Metric MutableRate readBlockOp;
|
@Metric MutableRate readBlockOp;
|
||||||
@Metric MutableRate writeBlockOp;
|
@Metric MutableRate writeBlockOp;
|
||||||
@Metric MutableRate blockChecksumOp;
|
@Metric MutableRate blockChecksumOp;
|
||||||
|
@ -296,6 +299,10 @@ public class DataNodeMetrics {
|
||||||
volumeFailures.incr();
|
volumeFailures.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrDatanodeNetworkErrors() {
|
||||||
|
datanodeNetworkErrors.incr();
|
||||||
|
}
|
||||||
|
|
||||||
/** Increment for getBlockLocalPathInfo calls */
|
/** Increment for getBlockLocalPathInfo calls */
|
||||||
public void incrBlocksGetLocalPathInfo() {
|
public void incrBlocksGetLocalPathInfo() {
|
||||||
blocksGetLocalPathInfo.incr();
|
blocksGetLocalPathInfo.incr();
|
||||||
|
|
|
@ -25,8 +25,13 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -38,10 +43,13 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
public class TestDataNodeMetrics {
|
public class TestDataNodeMetrics {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDataNodeMetrics() throws Exception {
|
public void testDataNodeMetrics() throws Exception {
|
||||||
|
@ -186,4 +194,38 @@ public class TestDataNodeMetrics {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testTimeoutMetric() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
final Path path = new Path("/test");
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
|
|
||||||
|
final List<FSDataOutputStream> streams = Lists.newArrayList();
|
||||||
|
try {
|
||||||
|
final FSDataOutputStream out =
|
||||||
|
cluster.getFileSystem().create(path, (short) 2);
|
||||||
|
final DataNodeFaultInjector injector = Mockito.mock
|
||||||
|
(DataNodeFaultInjector.class);
|
||||||
|
Mockito.doThrow(new IOException("mock IOException")).
|
||||||
|
when(injector).
|
||||||
|
writeBlockAfterFlush();
|
||||||
|
DataNodeFaultInjector.instance = injector;
|
||||||
|
streams.add(out);
|
||||||
|
out.writeBytes("old gs data\n");
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
final MetricsRecordBuilder dnMetrics =
|
||||||
|
getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
|
||||||
|
assertCounter("DatanodeNetworkErrors", 1L, dnMetrics);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
DataNodeFaultInjector.instance = new DataNodeFaultInjector();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue