HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad. Contributed by Kihwal Lee.

(cherry picked from commit 99e5204ff5)
This commit is contained in:
Kihwal Lee 2015-10-07 10:18:35 -05:00
parent 2b6fe90764
commit c43181d8a7
4 changed files with 124 additions and 2 deletions

View File

@ -1217,6 +1217,9 @@ Release 2.7.2 - UNRELEASED
pool to be scanned but there are suspicious blocks. (Colin Patrick McCabe pool to be scanned but there are suspicious blocks. (Colin Patrick McCabe
via yliu) via yliu)
HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad
(kihwal)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
import java.io.Closeable; import java.io.Closeable;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
@ -136,6 +137,8 @@ class BlockReceiver implements Closeable {
private DataOutputStream replyOut = null; private DataOutputStream replyOut = null;
private boolean pinning; private boolean pinning;
private long lastSentTime;
private long maxSendIdleTime;
BlockReceiver(final ExtendedBlock block, final StorageType storageType, BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in, final DataInputStream in,
@ -162,7 +165,8 @@ class BlockReceiver implements Closeable {
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs; this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
// For replaceBlock() calls response should be sent to avoid socketTimeout // For replaceBlock() calls response should be sent to avoid socketTimeout
// at clients. So sending with the interval of 0.5 * socketTimeout // at clients. So sending with the interval of 0.5 * socketTimeout
this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5); final long readTimeout = datanode.getDnConf().socketTimeout;
this.responseInterval = (long) (readTimeout * 0.5);
//for datanode, we have //for datanode, we have
//1: clientName.length() == 0, and //1: clientName.length() == 0, and
//2: stage == null or PIPELINE_SETUP_CREATE //2: stage == null or PIPELINE_SETUP_CREATE
@ -171,6 +175,12 @@ class BlockReceiver implements Closeable {
|| stage == BlockConstructionStage.TRANSFER_FINALIZED; || stage == BlockConstructionStage.TRANSFER_FINALIZED;
this.pinning = pinning; this.pinning = pinning;
this.lastSentTime = Time.monotonicNow();
// Downstream will timeout in readTimeout on receiving the next packet.
// If there is no data traffic, a heartbeat packet is sent at
// the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
// the threshold for detecting congestion.
this.maxSendIdleTime = (long) (readTimeout * 0.9);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": " + block LOG.debug(getClass().getSimpleName() + ": " + block
+ "\n isClient =" + isClient + ", clientname=" + clientname + "\n isClient =" + isClient + ", clientname=" + clientname
@ -357,6 +367,25 @@ class BlockReceiver implements Closeable {
} }
} }
synchronized void setLastSentTime(long sentTime) {
lastSentTime = sentTime;
}
/**
* It can return false if
* - upstream did not send packet for a long time
* - a packet was received but got stuck in local disk I/O.
* - a packet was received but got stuck on send to mirror.
*/
synchronized boolean packetSentInTime() {
long diff = Time.monotonicNow() - lastSentTime;
if (diff > maxSendIdleTime) {
LOG.info("A packet was last sent " + diff + " milliseconds ago.");
return false;
}
return true;
}
/** /**
* Flush block data and metadata files to disk. * Flush block data and metadata files to disk.
* @throws IOException * @throws IOException
@ -520,13 +549,21 @@ class BlockReceiver implements Closeable {
lastPacketInBlock, offsetInBlock, Status.SUCCESS); lastPacketInBlock, offsetInBlock, Status.SUCCESS);
} }
// Drop heartbeat for testing.
if (seqno < 0 && len == 0 &&
DataNodeFaultInjector.get().dropHeartbeatPacket()) {
return 0;
}
//First write the packet to the mirror: //First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) { if (mirrorOut != null && !mirrorError) {
try { try {
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
packetReceiver.mirrorPacketTo(mirrorOut); packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush(); mirrorOut.flush();
long duration = Time.monotonicNow() - begin; long now = Time.monotonicNow();
setLastSentTime(now);
long duration = now - begin;
if (duration > datanodeSlowLogThresholdMs) { if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@ -1296,6 +1333,17 @@ class BlockReceiver implements Closeable {
} catch (IOException ioe) { } catch (IOException ioe) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
isInterrupted = true; isInterrupted = true;
} else if (ioe instanceof EOFException && !packetSentInTime()) {
// The downstream error was caused by upstream including this
// node not sending packet in time. Let the upstream determine
// who is at fault. If the immediate upstream node thinks it
// has sent a packet in time, this node will be reported as bad.
// Otherwise, the upstream node will propagate the error up by
// closing the connection.
LOG.warn("The downstream error might be due to congestion in " +
"upstream including this node. Propagating the error: ",
ioe);
throw ioe;
} else { } else {
// continue to run even if can not read from mirror // continue to run even if can not read from mirror
// notify client of the error // notify client of the error

View File

@ -36,9 +36,17 @@ public class DataNodeFaultInjector {
return instance; return instance;
} }
public static void set(DataNodeFaultInjector injector) {
instance = injector;
}
public void getHdfsBlocksMetadata() {} public void getHdfsBlocksMetadata() {}
public void writeBlockAfterFlush() throws IOException {} public void writeBlockAfterFlush() throws IOException {}
public void sendShortCircuitShmResponse() throws IOException {} public void sendShortCircuitShmResponse() throws IOException {}
public boolean dropHeartbeatPacket() {
return false;
}
} }

View File

@ -21,11 +21,14 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
@ -160,6 +163,66 @@ public class TestClientProtocolForPipelineRecovery {
} }
} }
@Test
public void testPacketTransmissionDelay() throws Exception {
// Make the first datanode to not relay heartbeat packet.
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
@Override
public boolean dropHeartbeatPacket() {
return true;
}
};
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(dnFaultInjector);
// Setting the timeout to be 3 seconds. Normally heartbeat packet
// would be sent every 1.5 seconds if there is no data traffic.
Configuration conf = new HdfsConfiguration();
conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000");
MiniDFSCluster cluster = null;
try {
int numDataNodes = 2;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2);
out.write(0x31);
out.hflush();
DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
// original pipeline
DatanodeInfo[] orgNodes = dfsOut.getPipeline();
// Cause the second datanode to timeout on reading packet
Thread.sleep(3500);
out.write(0x32);
out.hflush();
// new pipeline
DatanodeInfo[] newNodes = dfsOut.getPipeline();
out.close();
boolean contains = false;
for (int i = 0; i < newNodes.length; i++) {
if (orgNodes[0].getXferAddr().equals(newNodes[i].getXferAddr())) {
throw new IOException("The first datanode should have been replaced.");
}
if (orgNodes[1].getXferAddr().equals(newNodes[i].getXferAddr())) {
contains = true;
}
}
Assert.assertTrue(contains);
} finally {
DataNodeFaultInjector.set(oldDnInjector);
if (cluster != null) {
cluster.shutdown();
}
}
}
/** /**
* Test recovery on restart OOB message. It also tests the delivery of * Test recovery on restart OOB message. It also tests the delivery of
* OOB ack originating from the primary datanode. Since there is only * OOB ack originating from the primary datanode. Since there is only