HDFS-10178. Permanent write failures can happen if pipeline recoveries occur for the first packet. Contributed by Kihwal Lee.
(cherry picked from commit a7d1fb0cd2
)
This commit is contained in:
parent
c981efb0f6
commit
9e91433295
|
@ -574,6 +574,8 @@ class BlockReceiver implements Closeable {
|
|||
if (mirrorOut != null && !mirrorError) {
|
||||
try {
|
||||
long begin = Time.monotonicNow();
|
||||
// For testing. Normally no-op.
|
||||
DataNodeFaultInjector.get().stopSendingPacketDownstream();
|
||||
packetReceiver.mirrorPacketTo(mirrorOut);
|
||||
mirrorOut.flush();
|
||||
long now = Time.monotonicNow();
|
||||
|
|
|
@ -301,11 +301,15 @@ class BlockSender implements java.io.Closeable {
|
|||
|
||||
// The meta file will contain only the header if the NULL checksum
|
||||
// type was used, or if the replica was written to transient storage.
|
||||
// Also, when only header portion of a data packet was transferred
|
||||
// and then pipeline breaks, the meta file can contain only the
|
||||
// header and 0 byte in the block data file.
|
||||
// Checksum verification is not performed for replicas on transient
|
||||
// storage. The header is important for determining the checksum
|
||||
// type later when lazy persistence copies the block to non-transient
|
||||
// storage and computes the checksum.
|
||||
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
|
||||
if (!replica.isOnTransientStorage() &&
|
||||
metaIn.getLength() >= BlockMetadataHeader.getHeaderSize()) {
|
||||
checksumIn = new DataInputStream(new BufferedInputStream(
|
||||
metaIn, IO_FILE_BUFFER_SIZE));
|
||||
|
||||
|
|
|
@ -50,5 +50,7 @@ public class DataNodeFaultInjector {
|
|||
return false;
|
||||
}
|
||||
|
||||
public void stopSendingPacketDownstream() throws IOException {}
|
||||
|
||||
public void noRegistration() throws IOException { }
|
||||
}
|
||||
|
|
|
@ -375,4 +375,57 @@ public class TestClientProtocolForPipelineRecovery {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to make sure the checksum is set correctly after pipeline
|
||||
* recovery transfers 0 byte partial block. If fails the test case
|
||||
* will say "java.io.IOException: Failed to replace a bad datanode
|
||||
* on the existing pipeline due to no more good datanodes being
|
||||
* available to try." This indicates there was a real failure
|
||||
* after the staged failure.
|
||||
*/
|
||||
@Test
|
||||
public void testZeroByteBlockRecovery() throws Exception {
|
||||
// Make the first datanode fail once. With 3 nodes and a block being
|
||||
// created with 2 replicas, anything more than this planned failure
|
||||
// will cause a test failure.
|
||||
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
|
||||
int tries = 1;
|
||||
@Override
|
||||
public void stopSendingPacketDownstream() throws IOException {
|
||||
if (tries > 0) {
|
||||
tries--;
|
||||
try {
|
||||
Thread.sleep(60000);
|
||||
} catch (InterruptedException ie) {
|
||||
throw new IOException("Interrupted while sleeping. Bailing out.");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
|
||||
DataNodeFaultInjector.set(dnFaultInjector);
|
||||
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "1000");
|
||||
conf.set(HdfsClientConfigKeys.
|
||||
BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, "ALWAYS");
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
cluster.waitActive();
|
||||
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2);
|
||||
out.write(0x31);
|
||||
out.hflush();
|
||||
out.close();
|
||||
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
DataNodeFaultInjector.set(oldDnInjector);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue