HDFS-9752. Permanent write failures may happen to slow writers during datanode rolling upgrades. Contributed by Walter Su.

This commit is contained in:
Kihwal Lee 2016-02-08 12:16:05 -06:00
parent cf3261570a
commit 193d27de0a
6 changed files with 109 additions and 24 deletions

View File

@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
@ -368,9 +369,8 @@ class DataStreamer extends Daemon {
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>();
/** The last ack sequence number before pipeline failure. */
private long lastAckedSeqnoBeforeFailure = -1;
private int pipelineRecoveryCount = 0;
/** The times have retried to recover pipeline, for the same packet. */
private volatile int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
private boolean isHflushed = false;
/** Append on an existing block? */
@ -1040,6 +1040,7 @@ class DataStreamer extends Daemon {
one.setTraceScope(null);
}
lastAckedSeqno = seqno;
pipelineRecoveryCount = 0;
ackQueue.removeFirst();
dataQueue.notifyAll();
@ -1101,15 +1102,10 @@ class DataStreamer extends Daemon {
ackQueue.clear();
}
// Record the new pipeline failure recovery.
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
lastAckedSeqnoBeforeFailure = lastAckedSeqno;
pipelineRecoveryCount = 1;
} else {
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (++pipelineRecoveryCount > 5) {
if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
@ -1117,7 +1113,6 @@ class DataStreamer extends Daemon {
streamerClosed = true;
return false;
}
}
setupPipelineForAppendOrRecovery();
@ -1144,6 +1139,7 @@ class DataStreamer extends Daemon {
assert endOfBlockPacket.isLastPacketInBlock();
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
lastAckedSeqno = endOfBlockPacket.getSeqno();
pipelineRecoveryCount = 0;
dataQueue.notifyAll();
}
endBlock();
@ -1914,6 +1910,14 @@ class DataStreamer extends Daemon {
return streamerClosed;
}
/**
* @return The times have retried to recover pipeline, for the same packet.
*/
@VisibleForTesting
int getPipelineRecoveryCount() {
return pipelineRecoveryCount;
}
void closeSocket() throws IOException {
if (s != null) {
s.close();

View File

@ -2789,6 +2789,9 @@ Release 2.7.3 - UNRELEASED
HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse
ObjectMapper. (Akira AJISAKA via wheat9)
HDFS-9752. Permanent write failures may happen to slow writers during
datanode rolling upgrades (Walter Su via kihwal)
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES

View File

@ -2921,7 +2921,7 @@ public class DataNode extends ReconfigurableBase
// Asynchronously start the shutdown process so that the rpc response can be
// sent back.
Thread shutdownThread = new Thread() {
Thread shutdownThread = new Thread("Async datanode shutdown thread") {
@Override public void run() {
if (!shutdownForUpgrade) {
// Delay the shutdown a bit if not doing for restart.

View File

@ -2187,6 +2187,28 @@ public class MiniDFSCluster {
return stopDataNode(node);
}
/*
* Shutdown a particular datanode
* @param i node index
* @return null if the node index is out of range, else the properties of the
* removed node
*/
public synchronized DataNodeProperties stopDataNodeForUpgrade(int i)
throws IOException {
if (i < 0 || i >= dataNodes.size()) {
return null;
}
DataNodeProperties dnprop = dataNodes.remove(i);
DataNode dn = dnprop.datanode;
LOG.info("MiniDFSCluster Stopping DataNode " +
dn.getDisplayName() +
" from a total of " + (dataNodes.size() + 1) +
" datanodes.");
dn.shutdownDatanode(true);
numDataNodes--;
return dnprop;
}
/**
* Restart a datanode
* @param dnprop datanode's property

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -34,6 +35,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -256,7 +258,8 @@ public class TestClientProtocolForPipelineRecovery {
final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
// Wait long enough to receive an OOB ack before closing the file.
Thread.sleep(4000);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
// Retart the datanode
cluster.restartDataNode(0, true);
// The following forces a data packet and end of block packets to be sent.
@ -293,7 +296,8 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode.
final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
Thread.sleep(4000);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
// This should succeed without restarting the node. The restart will
// expire and regular pipeline recovery will kick in.
out.close();
@ -309,7 +313,8 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode.
final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args2));
Thread.sleep(4000);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
try {
// close should fail
out.close();
@ -321,4 +326,53 @@ public class TestClientProtocolForPipelineRecovery {
}
}
}
/**
* HDFS-9752. The client keeps sending heartbeat packets during datanode
* rolling upgrades. The client should be able to retry pipeline recovery
* more times than the default.
* (in a row for the same packet, including the heartbeat packet)
* (See{@link DataStreamer#pipelineRecoveryCount})
*/
@Test(timeout = 60000)
public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L);
final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file).
getWrappedStream());
out.write(1);
out.hflush();
final long oldGs = out.getBlock().getGenerationStamp();
MiniDFSCluster.DataNodeProperties dnProps =
cluster.stopDataNodeForUpgrade(0);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
cluster.restartDataNode(dnProps, true);
cluster.waitActive();
// wait pipeline to be recovered
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return out.getBlock().getGenerationStamp() > oldGs;
}
}, 100, 10000);
Assert.assertEquals("The pipeline recovery count shouldn't increase",
0, out.getStreamer().getPipelineRecoveryCount());
out.write(1);
out.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@ -407,7 +408,8 @@ public class TestRollingUpgrade {
runCmd(dfsadmin, true, args2);
// the datanode should be down.
Thread.sleep(2000);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
Assert.assertFalse("DataNode should exit", dn.isDatanodeUp());
// ping should fail.