HDFS-16146. All three replicas are lost due to not adding a new DataN… (#3247) Contributed by Shuyan Zhang.

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
zhangshuyan0 2021-08-04 00:22:21 +08:00 committed by GitHub
parent a5811dda7b
commit 10a2526b0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 8 deletions

View File

@ -1386,19 +1386,11 @@ class DataStreamer extends Daemon {
* Case 2: Failure in Streaming
* - Append/Create:
* + transfer RBW
*
* Case 3: Failure in Close
* - Append/Create:
* + no transfer, let NameNode replicates the block.
*/
if (!isAppend && lastAckedSeqno < 0
&& stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
//no data have been written
return;
} else if (stage == BlockConstructionStage.PIPELINE_CLOSE
|| stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
//pipeline is closing
return;
}
int tried = 0;

View File

@ -1492,6 +1492,8 @@ class BlockReceiver implements Closeable {
if (lastPacketInBlock) {
// Finalize the block and close the block file
finalizeBlock(startTime);
// For test only, no-op in production system.
DataNodeFaultInjector.get().delayAckLastPacket();
}
Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS;

View File

@ -68,6 +68,12 @@ public class DataNodeFaultInjector {
throws IOException {
}
/**
* Used as a hook to delay sending the response of the last packet.
*/
public void delayAckLastPacket() throws IOException {
}
/**
* Used as a hook to delay writing a packet to disk.
*/

View File

@ -19,12 +19,14 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@ -39,6 +41,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@ -800,4 +803,94 @@ public class TestClientProtocolForPipelineRecovery {
}
}
}
@Test
public void testAddingDatanodeDuringClosing() throws Exception {
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
@Override
public void delayAckLastPacket() throws IOException {
try {
// Makes the PIPELINE_CLOSE stage longer.
Thread.sleep(5000);
} catch (InterruptedException ie) {
throw new IOException("Interrupted while sleeping");
}
}
};
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(dnFaultInjector);
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("/testAddingDatanodeDuringClosing");
FSDataOutputStream out = fileSys.create(file);
byte[] buffer = new byte[128 * 1024];
out.write(buffer);
// Wait for the pipeline to be built successfully.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
if (((DFSOutputStream) out.getWrappedStream()).getStreamer()
.getNodes() != null) {
return true;
}
return false;
}
}, 100, 3000);
// Get three datanodes on the pipeline.
DatanodeInfo[] pipeline =
((DFSOutputStream) out.getWrappedStream()).getStreamer().getNodes();
DataNode[] dataNodes = new DataNode[3];
int i = 0;
for (DatanodeInfo info : pipeline) {
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getDatanodeUuid().equals(info.getDatanodeUuid())) {
dataNodes[i++] = dn;
break;
}
}
}
// Shutdown the first datanode. According to the default replacement
// strategy, no datanode will be added to existing pipeline.
dataNodes[0].shutdown();
// Shutdown the second datanode when the pipeline is closing.
new Thread(() -> {
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
if (((DFSOutputStream) out.getWrappedStream()).getStreamer()
.getStage() == BlockConstructionStage.PIPELINE_CLOSE) {
return true;
}
return false;
}
}, 100, 10000);
} catch (TimeoutException | InterruptedException e) {
e.printStackTrace();
}
dataNodes[1].shutdown();
}).start();
out.close();
// Shutdown the third datanode.
dataNodes[2].shutdown();
// Check if we can read the file successfully.
DFSTestUtil.readFile(fileSys, file);
} catch (BlockMissingException e) {
fail("The file can not be read! " + e);
} finally {
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.set(oldDnInjector);
}
}
}