HDFS-9752. Permanent write failures may happen to slow writers during datanode rolling upgrades. Contributed by Walter Su.

(cherry picked from commit 193d27de0a)
This commit is contained in:
Kihwal Lee 2016-02-08 12:17:39 -06:00
parent 44c6277d01
commit aae0f183af
6 changed files with 109 additions and 24 deletions

View File

@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
@ -348,9 +349,8 @@ class DataStreamer extends Daemon {
/** Nodes have been used in the pipeline before and have failed. */ /** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>(); private final List<DatanodeInfo> failed = new ArrayList<>();
/** The last ack sequence number before pipeline failure. */ /** The times have retried to recover pipeline, for the same packet. */
private long lastAckedSeqnoBeforeFailure = -1; private volatile int pipelineRecoveryCount = 0;
private int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */ /** Has the current block been hflushed? */
private boolean isHflushed = false; private boolean isHflushed = false;
/** Append on an existing block? */ /** Append on an existing block? */
@ -1013,6 +1013,7 @@ class DataStreamer extends Daemon {
one.setTraceScope(null); one.setTraceScope(null);
} }
lastAckedSeqno = seqno; lastAckedSeqno = seqno;
pipelineRecoveryCount = 0;
ackQueue.removeFirst(); ackQueue.removeFirst();
dataQueue.notifyAll(); dataQueue.notifyAll();
@ -1069,22 +1070,16 @@ class DataStreamer extends Daemon {
ackQueue.clear(); ackQueue.clear();
} }
// Record the new pipeline failure recovery. // If we had to recover the pipeline five times in a row for the
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { // same packet, this client likely has corrupt data or corrupting
lastAckedSeqnoBeforeFailure = lastAckedSeqno; // during transmission.
pipelineRecoveryCount = 1; if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
} else { LOG.warn("Error recovering pipeline for writing " +
// If we had to recover the pipeline five times in a row for the block + ". Already retried 5 times for the same packet.");
// same packet, this client likely has corrupt data or corrupting lastException.set(new IOException("Failing write. Tried pipeline " +
// during transmission. "recovery 5 times without success."));
if (++pipelineRecoveryCount > 5) { streamerClosed = true;
LOG.warn("Error recovering pipeline for writing " + return false;
block + ". Already retried 5 times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."));
streamerClosed = true;
return false;
}
} }
boolean doSleep = setupPipelineForAppendOrRecovery(); boolean doSleep = setupPipelineForAppendOrRecovery();
@ -1111,6 +1106,7 @@ class DataStreamer extends Daemon {
assert endOfBlockPacket.isLastPacketInBlock(); assert endOfBlockPacket.isLastPacketInBlock();
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
lastAckedSeqno = endOfBlockPacket.getSeqno(); lastAckedSeqno = endOfBlockPacket.getSeqno();
pipelineRecoveryCount = 0;
dataQueue.notifyAll(); dataQueue.notifyAll();
} }
endBlock(); endBlock();
@ -1907,6 +1903,14 @@ class DataStreamer extends Daemon {
return streamerClosed; return streamerClosed;
} }
/**
* @return The times have retried to recover pipeline, for the same packet.
*/
@VisibleForTesting
int getPipelineRecoveryCount() {
return pipelineRecoveryCount;
}
void closeSocket() throws IOException { void closeSocket() throws IOException {
if (s != null) { if (s != null) {
s.close(); s.close();

View File

@ -1808,6 +1808,9 @@ Release 2.7.3 - UNRELEASED
HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse
ObjectMapper. (Akira AJISAKA via wheat9) ObjectMapper. (Akira AJISAKA via wheat9)
HDFS-9752. Permanent write failures may happen to slow writers during
datanode rolling upgrades (Walter Su via kihwal)
Release 2.7.2 - 2016-01-25 Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -2897,7 +2897,7 @@ public class DataNode extends ReconfigurableBase
// Asynchronously start the shutdown process so that the rpc response can be // Asynchronously start the shutdown process so that the rpc response can be
// sent back. // sent back.
Thread shutdownThread = new Thread() { Thread shutdownThread = new Thread("Async datanode shutdown thread") {
@Override public void run() { @Override public void run() {
if (!shutdownForUpgrade) { if (!shutdownForUpgrade) {
// Delay the shutdown a bit if not doing for restart. // Delay the shutdown a bit if not doing for restart.

View File

@ -2077,6 +2077,28 @@ public class MiniDFSCluster {
return stopDataNode(node); return stopDataNode(node);
} }
/*
* Shutdown a particular datanode
* @param i node index
* @return null if the node index is out of range, else the properties of the
* removed node
*/
public synchronized DataNodeProperties stopDataNodeForUpgrade(int i)
throws IOException {
if (i < 0 || i >= dataNodes.size()) {
return null;
}
DataNodeProperties dnprop = dataNodes.remove(i);
DataNode dn = dnprop.datanode;
LOG.info("MiniDFSCluster Stopping DataNode " +
dn.getDisplayName() +
" from a total of " + (dataNodes.size() + 1) +
" datanodes.");
dn.shutdownDatanode(true);
numDataNodes--;
return dnprop;
}
/** /**
* Restart a datanode * Restart a datanode
* @param dnprop datanode's property * @param dnprop datanode's property

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
import java.io.IOException; import java.io.IOException;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -34,6 +35,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -256,7 +258,8 @@ public class TestClientProtocolForPipelineRecovery {
final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" }; final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1)); Assert.assertEquals(0, dfsadmin.run(args1));
// Wait long enough to receive an OOB ack before closing the file. // Wait long enough to receive an OOB ack before closing the file.
Thread.sleep(4000); GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
// Retart the datanode // Retart the datanode
cluster.restartDataNode(0, true); cluster.restartDataNode(0, true);
// The following forces a data packet and end of block packets to be sent. // The following forces a data packet and end of block packets to be sent.
@ -293,7 +296,8 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode. // issue shutdown to the datanode.
final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" }; final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1)); Assert.assertEquals(0, dfsadmin.run(args1));
Thread.sleep(4000); GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
// This should succeed without restarting the node. The restart will // This should succeed without restarting the node. The restart will
// expire and regular pipeline recovery will kick in. // expire and regular pipeline recovery will kick in.
out.close(); out.close();
@ -309,7 +313,8 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode. // issue shutdown to the datanode.
final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" }; final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args2)); Assert.assertEquals(0, dfsadmin.run(args2));
Thread.sleep(4000); GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
try { try {
// close should fail // close should fail
out.close(); out.close();
@ -321,4 +326,53 @@ public class TestClientProtocolForPipelineRecovery {
} }
} }
} }
/**
* HDFS-9752. The client keeps sending heartbeat packets during datanode
* rolling upgrades. The client should be able to retry pipeline recovery
* more times than the default.
* (in a row for the same packet, including the heartbeat packet)
* (See{@link DataStreamer#pipelineRecoveryCount})
*/
@Test(timeout = 60000)
public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L);
final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file).
getWrappedStream());
out.write(1);
out.hflush();
final long oldGs = out.getBlock().getGenerationStamp();
MiniDFSCluster.DataNodeProperties dnProps =
cluster.stopDataNodeForUpgrade(0);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
cluster.restartDataNode(dnProps, true);
cluster.waitActive();
// wait pipeline to be recovered
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return out.getBlock().getGenerationStamp() > oldGs;
}
}, 100, 10000);
Assert.assertEquals("The pipeline recovery count shouldn't increase",
0, out.getStreamer().getPipelineRecoveryCount());
out.write(1);
out.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
} }

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate; import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -407,7 +408,8 @@ public class TestRollingUpgrade {
runCmd(dfsadmin, true, args2); runCmd(dfsadmin, true, args2);
// the datanode should be down. // the datanode should be down.
Thread.sleep(2000); GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
Assert.assertFalse("DataNode should exit", dn.isDatanodeUp()); Assert.assertFalse("DataNode should exit", dn.isDatanodeUp());
// ping should fail. // ping should fail.