HDFS-10178. Permanent write failures can happen if pipeline recoveries occur for the first packet. Contributed by Kihwal Lee.

(cherry picked from commit a7d1fb0cd2)
This commit is contained in:
Kihwal Lee 2016-04-04 16:41:23 -05:00
parent 97cda4077a
commit 7d671cad3f
4 changed files with 62 additions and 1 deletions

View File

@ -566,6 +566,8 @@ class BlockReceiver implements Closeable {
if (mirrorOut != null && !mirrorError) { if (mirrorOut != null && !mirrorError) {
try { try {
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
// For testing. Normally no-op.
DataNodeFaultInjector.get().stopSendingPacketDownstream();
packetReceiver.mirrorPacketTo(mirrorOut); packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush(); mirrorOut.flush();
long now = Time.monotonicNow(); long now = Time.monotonicNow();

View File

@ -301,11 +301,15 @@ class BlockSender implements java.io.Closeable {
// The meta file will contain only the header if the NULL checksum // The meta file will contain only the header if the NULL checksum
// type was used, or if the replica was written to transient storage. // type was used, or if the replica was written to transient storage.
// Also, when only header portion of a data packet was transferred
// and then pipeline breaks, the meta file can contain only the
// header and 0 byte in the block data file.
// Checksum verification is not performed for replicas on transient // Checksum verification is not performed for replicas on transient
// storage. The header is important for determining the checksum // storage. The header is important for determining the checksum
// type later when lazy persistence copies the block to non-transient // type later when lazy persistence copies the block to non-transient
// storage and computes the checksum. // storage and computes the checksum.
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) { if (!replica.isOnTransientStorage() &&
metaIn.getLength() >= BlockMetadataHeader.getHeaderSize()) {
checksumIn = new DataInputStream(new BufferedInputStream( checksumIn = new DataInputStream(new BufferedInputStream(
metaIn, IO_FILE_BUFFER_SIZE)); metaIn, IO_FILE_BUFFER_SIZE));

View File

@ -50,5 +50,7 @@ public class DataNodeFaultInjector {
return false; return false;
} }
public void stopSendingPacketDownstream() throws IOException {}
public void noRegistration() throws IOException { } public void noRegistration() throws IOException { }
} }

View File

@ -375,4 +375,57 @@ public class TestClientProtocolForPipelineRecovery {
} }
} }
} }
/**
* Test to make sure the checksum is set correctly after pipeline
* recovery transfers 0 byte partial block. If fails the test case
* will say "java.io.IOException: Failed to replace a bad datanode
* on the existing pipeline due to no more good datanodes being
* available to try." This indicates there was a real failure
* after the staged failure.
*/
@Test
public void testZeroByteBlockRecovery() throws Exception {
// Make the first datanode fail once. With 3 nodes and a block being
// created with 2 replicas, anything more than this planned failure
// will cause a test failure.
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
int tries = 1;
@Override
public void stopSendingPacketDownstream() throws IOException {
if (tries > 0) {
tries--;
try {
Thread.sleep(60000);
} catch (InterruptedException ie) {
throw new IOException("Interrupted while sleeping. Bailing out.");
}
}
}
};
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(dnFaultInjector);
Configuration conf = new HdfsConfiguration();
conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "1000");
conf.set(HdfsClientConfigKeys.
BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, "ALWAYS");
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2);
out.write(0x31);
out.hflush();
out.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.set(oldDnInjector);
}
}
} }