HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad. Contributed by Kihwal Lee.
(cherry picked from commit 99e5204ff5
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
This commit is contained in:
parent
a231075964
commit
5cb80992fb
|
@ -57,6 +57,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
pool to be scanned but there are suspicious blocks. (Colin Patrick McCabe
|
pool to be scanned but there are suspicious blocks. (Colin Patrick McCabe
|
||||||
via yliu)
|
via yliu)
|
||||||
|
|
||||||
|
HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad
|
||||||
|
(kihwal)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.Closeable;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.FileDescriptor;
|
import java.io.FileDescriptor;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -134,6 +135,8 @@ class BlockReceiver implements Closeable {
|
||||||
private DataOutputStream replyOut = null;
|
private DataOutputStream replyOut = null;
|
||||||
|
|
||||||
private boolean pinning;
|
private boolean pinning;
|
||||||
|
private long lastSentTime;
|
||||||
|
private long maxSendIdleTime;
|
||||||
|
|
||||||
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
||||||
final DataInputStream in,
|
final DataInputStream in,
|
||||||
|
@ -160,7 +163,8 @@ class BlockReceiver implements Closeable {
|
||||||
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
|
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
|
||||||
// For replaceBlock() calls response should be sent to avoid socketTimeout
|
// For replaceBlock() calls response should be sent to avoid socketTimeout
|
||||||
// at clients. So sending with the interval of 0.5 * socketTimeout
|
// at clients. So sending with the interval of 0.5 * socketTimeout
|
||||||
this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
|
final long readTimeout = datanode.getDnConf().socketTimeout;
|
||||||
|
this.responseInterval = (long) (readTimeout * 0.5);
|
||||||
//for datanode, we have
|
//for datanode, we have
|
||||||
//1: clientName.length() == 0, and
|
//1: clientName.length() == 0, and
|
||||||
//2: stage == null or PIPELINE_SETUP_CREATE
|
//2: stage == null or PIPELINE_SETUP_CREATE
|
||||||
|
@ -169,6 +173,12 @@ class BlockReceiver implements Closeable {
|
||||||
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
|
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
|
||||||
|
|
||||||
this.pinning = pinning;
|
this.pinning = pinning;
|
||||||
|
this.lastSentTime = Time.monotonicNow();
|
||||||
|
// Downstream will timeout in readTimeout on receiving the next packet.
|
||||||
|
// If there is no data traffic, a heartbeat packet is sent at
|
||||||
|
// the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
|
||||||
|
// the threshold for detecting congestion.
|
||||||
|
this.maxSendIdleTime = (long) (readTimeout * 0.9);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(getClass().getSimpleName() + ": " + block
|
LOG.debug(getClass().getSimpleName() + ": " + block
|
||||||
+ "\n isClient =" + isClient + ", clientname=" + clientname
|
+ "\n isClient =" + isClient + ", clientname=" + clientname
|
||||||
|
@ -351,6 +361,25 @@ class BlockReceiver implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized void setLastSentTime(long sentTime) {
|
||||||
|
lastSentTime = sentTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It can return false if
|
||||||
|
* - upstream did not send packet for a long time
|
||||||
|
* - a packet was received but got stuck in local disk I/O.
|
||||||
|
* - a packet was received but got stuck on send to mirror.
|
||||||
|
*/
|
||||||
|
synchronized boolean packetSentInTime() {
|
||||||
|
long diff = Time.monotonicNow() - lastSentTime;
|
||||||
|
if (diff > maxSendIdleTime) {
|
||||||
|
LOG.info("A packet was last sent " + diff + " milliseconds ago.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush block data and metadata files to disk.
|
* Flush block data and metadata files to disk.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -514,13 +543,21 @@ class BlockReceiver implements Closeable {
|
||||||
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Drop heartbeat for testing.
|
||||||
|
if (seqno < 0 && len == 0 &&
|
||||||
|
DataNodeFaultInjector.get().dropHeartbeatPacket()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
//First write the packet to the mirror:
|
//First write the packet to the mirror:
|
||||||
if (mirrorOut != null && !mirrorError) {
|
if (mirrorOut != null && !mirrorError) {
|
||||||
try {
|
try {
|
||||||
long begin = Time.monotonicNow();
|
long begin = Time.monotonicNow();
|
||||||
packetReceiver.mirrorPacketTo(mirrorOut);
|
packetReceiver.mirrorPacketTo(mirrorOut);
|
||||||
mirrorOut.flush();
|
mirrorOut.flush();
|
||||||
long duration = Time.monotonicNow() - begin;
|
long now = Time.monotonicNow();
|
||||||
|
setLastSentTime(now);
|
||||||
|
long duration = now - begin;
|
||||||
if (duration > datanodeSlowLogThresholdMs) {
|
if (duration > datanodeSlowLogThresholdMs) {
|
||||||
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
|
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
|
||||||
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
||||||
|
@ -1298,6 +1335,17 @@ class BlockReceiver implements Closeable {
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
isInterrupted = true;
|
isInterrupted = true;
|
||||||
|
} else if (ioe instanceof EOFException && !packetSentInTime()) {
|
||||||
|
// The downstream error was caused by upstream including this
|
||||||
|
// node not sending packet in time. Let the upstream determine
|
||||||
|
// who is at fault. If the immediate upstream node thinks it
|
||||||
|
// has sent a packet in time, this node will be reported as bad.
|
||||||
|
// Otherwise, the upstream node will propagate the error up by
|
||||||
|
// closing the connection.
|
||||||
|
LOG.warn("The downstream error might be due to congestion in " +
|
||||||
|
"upstream including this node. Propagating the error: ",
|
||||||
|
ioe);
|
||||||
|
throw ioe;
|
||||||
} else {
|
} else {
|
||||||
// continue to run even if can not read from mirror
|
// continue to run even if can not read from mirror
|
||||||
// notify client of the error
|
// notify client of the error
|
||||||
|
|
|
@ -36,9 +36,17 @@ public class DataNodeFaultInjector {
|
||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void set(DataNodeFaultInjector injector) {
|
||||||
|
instance = injector;
|
||||||
|
}
|
||||||
|
|
||||||
public void getHdfsBlocksMetadata() {}
|
public void getHdfsBlocksMetadata() {}
|
||||||
|
|
||||||
public void writeBlockAfterFlush() throws IOException {}
|
public void writeBlockAfterFlush() throws IOException {}
|
||||||
|
|
||||||
public void sendShortCircuitShmResponse() throws IOException {}
|
public void sendShortCircuitShmResponse() throws IOException {}
|
||||||
|
|
||||||
|
public boolean dropHeartbeatPacket() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,10 +21,13 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
|
@ -162,6 +165,66 @@ public class TestClientProtocolForPipelineRecovery {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPacketTransmissionDelay() throws Exception {
|
||||||
|
// Make the first datanode to not relay heartbeat packet.
|
||||||
|
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public boolean dropHeartbeatPacket() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
|
||||||
|
DataNodeFaultInjector.set(dnFaultInjector);
|
||||||
|
|
||||||
|
// Setting the timeout to be 3 seconds. Normally heartbeat packet
|
||||||
|
// would be sent every 1.5 seconds if there is no data traffic.
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000");
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
int numDataNodes = 2;
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2);
|
||||||
|
out.write(0x31);
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
|
||||||
|
|
||||||
|
// original pipeline
|
||||||
|
DatanodeInfo[] orgNodes = dfsOut.getPipeline();
|
||||||
|
|
||||||
|
// Cause the second datanode to timeout on reading packet
|
||||||
|
Thread.sleep(3500);
|
||||||
|
out.write(0x32);
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
// new pipeline
|
||||||
|
DatanodeInfo[] newNodes = dfsOut.getPipeline();
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
boolean contains = false;
|
||||||
|
for (int i = 0; i < newNodes.length; i++) {
|
||||||
|
if (orgNodes[0].getXferAddr().equals(newNodes[i].getXferAddr())) {
|
||||||
|
throw new IOException("The first datanode should have been replaced.");
|
||||||
|
}
|
||||||
|
if (orgNodes[1].getXferAddr().equals(newNodes[i].getXferAddr())) {
|
||||||
|
contains = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertTrue(contains);
|
||||||
|
} finally {
|
||||||
|
DataNodeFaultInjector.set(oldDnInjector);
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test recovery on restart OOB message. It also tests the delivery of
|
* Test recovery on restart OOB message. It also tests the delivery of
|
||||||
* OOB ack originating from the primary datanode. Since there is only
|
* OOB ack originating from the primary datanode. Since there is only
|
||||||
|
|
Loading…
Reference in New Issue