HDFS-9752. Permanent write failures may happen to slow writers during datanode rolling upgrades. (Contributed by Walter Su)

This commit is contained in:
Akira Ajisaka 2016-02-10 03:52:42 +09:00
parent 2a46d9c6c4
commit 948dd27960
6 changed files with 109 additions and 24 deletions

View File

@ -92,6 +92,9 @@ Release 2.7.3 - UNRELEASED
HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse
ObjectMapper. (Akira AJISAKA via wheat9)
HDFS-9752. Permanent write failures may happen to slow writers during
datanode rolling upgrades (Walter Su via kihwal)
HDFS-9784. Example usage is not correct in Transparent Encryption document.
(Takashi Ohnishi via aajisaka)

View File

@ -265,9 +265,8 @@ public class DFSOutputStream extends FSOutputSummer
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
/** The last ack sequence number before pipeline failure. */
private long lastAckedSeqnoBeforeFailure = -1;
private int pipelineRecoveryCount = 0;
/** The times have retried to recover pipeline, for the same packet. */
private volatile int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
private boolean isHflushed = false;
/** Append on an existing block? */
@ -803,6 +802,7 @@ public class DFSOutputStream extends FSOutputSummer
scope = Trace.continueSpan(one.getTraceSpan());
one.setTraceSpan(null);
lastAckedSeqno = seqno;
pipelineRecoveryCount = 0;
ackQueue.removeFirst();
dataQueue.notifyAll();
@ -856,23 +856,18 @@ public class DFSOutputStream extends FSOutputSummer
ackQueue.clear();
}
// Record the new pipeline failure recovery.
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
lastAckedSeqnoBeforeFailure = lastAckedSeqno;
pipelineRecoveryCount = 1;
} else {
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."));
streamerClosed = true;
return false;
}
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (restartingNodeIndex.get() == -1 && ++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."));
streamerClosed = true;
return false;
}
boolean doSleep = setupPipelineForAppendOrRecovery();
if (!streamerClosed && dfsClient.clientRunning) {
@ -897,6 +892,7 @@ public class DFSOutputStream extends FSOutputSummer
assert endOfBlockPacket.isLastPacketInBlock();
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
lastAckedSeqno = endOfBlockPacket.getSeqno();
pipelineRecoveryCount = 0;
dataQueue.notifyAll();
}
endBlock();
@ -2381,4 +2377,12 @@ public class DFSOutputStream extends FSOutputSummer
System.arraycopy(srcs, 0, dsts, 0, skipIndex);
System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
}
/**
* @return The times have retried to recover pipeline, for the same packet.
*/
@VisibleForTesting
int getPipelineRecoveryCount() {
return streamer.pipelineRecoveryCount;
}
}

View File

@ -3000,7 +3000,7 @@ public class DataNode extends ReconfigurableBase
// Asynchronously start the shutdown process so that the rpc response can be
// sent back.
Thread shutdownThread = new Thread() {
Thread shutdownThread = new Thread("Async datanode shutdown thread") {
@Override public void run() {
if (!shutdownForUpgrade) {
// Delay the shutdown a bit if not doing for restart.

View File

@ -1969,6 +1969,28 @@ public class MiniDFSCluster {
return stopDataNode(node);
}
/*
* Shutdown a particular datanode
* @param i node index
* @return null if the node index is out of range, else the properties of the
* removed node
*/
public synchronized DataNodeProperties stopDataNodeForUpgrade(int i)
throws IOException {
if (i < 0 || i >= dataNodes.size()) {
return null;
}
DataNodeProperties dnprop = dataNodes.remove(i);
DataNode dn = dnprop.datanode;
LOG.info("MiniDFSCluster Stopping DataNode " +
dn.getDisplayName() +
" from a total of " + (dataNodes.size() + 1) +
" datanodes.");
dn.shutdownDatanode(true);
numDataNodes--;
return dnprop;
}
/**
* Restart a datanode
* @param dnprop datanode's property

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -33,6 +34,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@ -256,7 +258,8 @@ public class TestClientProtocolForPipelineRecovery {
final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
// Wait long enough to receive an OOB ack before closing the file.
Thread.sleep(4000);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
// Retart the datanode
cluster.restartDataNode(0, true);
// The following forces a data packet and end of block packets to be sent.
@ -293,7 +296,8 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode.
final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
Thread.sleep(4000);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
// This should succeed without restarting the node. The restart will
// expire and regular pipeline recovery will kick in.
out.close();
@ -309,7 +313,8 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode.
final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args2));
Thread.sleep(4000);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
try {
// close should fail
out.close();
@ -321,4 +326,53 @@ public class TestClientProtocolForPipelineRecovery {
}
}
}
/**
* HDFS-9752. The client keeps sending heartbeat packets during datanode
* rolling upgrades. The client should be able to retry pipeline recovery
* more times than the default.
* (in a row for the same packet, including the heartbeat packet)
* (See{@link DataStreamer#pipelineRecoveryCount})
*/
@Test(timeout = 60000)
public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L);
final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file).
getWrappedStream());
out.write(1);
out.hflush();
final long oldGs = out.getBlock().getGenerationStamp();
MiniDFSCluster.DataNodeProperties dnProps =
cluster.stopDataNodeForUpgrade(0);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
cluster.restartDataNode(dnProps, true);
cluster.waitActive();
// wait pipeline to be recovered
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return out.getBlock().getGenerationStamp() > oldGs;
}
}, 100, 10000);
Assert.assertEquals("The pipeline recovery count shouldn't increase",
0, out.getPipelineRecoveryCount());
out.write(1);
out.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@ -404,7 +405,8 @@ public class TestRollingUpgrade {
runCmd(dfsadmin, true, args2);
// the datanode should be down.
Thread.sleep(2000);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
Assert.assertFalse("DataNode should exit", dn.isDatanodeUp());
// ping should fail.