HDFS-10652. Add a unit test for HDFS-4660. Contributed by Vinayakumar B., Wei-Chiu Chuang, Yongjun Zhang.
(cherry picked from commit c25817159a
)
This commit is contained in:
parent
07e1527cc1
commit
e813a3ea4a
|
@ -1306,6 +1306,7 @@ class BlockReceiver implements Closeable {
|
||||||
long ackRecvNanoTime = 0;
|
long ackRecvNanoTime = 0;
|
||||||
try {
|
try {
|
||||||
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
|
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
|
||||||
|
DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr);
|
||||||
// read an ack from downstream datanode
|
// read an ack from downstream datanode
|
||||||
ack.readFields(downstreamIn);
|
ack.readFields(downstreamIn);
|
||||||
ackRecvNanoTime = System.nanoTime();
|
ackRecvNanoTime = System.nanoTime();
|
||||||
|
|
|
@ -2359,7 +2359,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
blockSender.sendBlock(out, unbufOut, null);
|
blockSender.sendBlock(out, unbufOut, null);
|
||||||
|
|
||||||
// no response necessary
|
// no response necessary
|
||||||
LOG.info(getClass().getSimpleName() + ": Transmitted " + b
|
LOG.info(getClass().getSimpleName() + ", at "
|
||||||
|
+ DataNode.this.getDisplayName() + ": Transmitted " + b
|
||||||
+ " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
|
+ " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
|
||||||
|
|
||||||
// read ack
|
// read ack
|
||||||
|
|
|
@ -55,4 +55,7 @@ public class DataNodeFaultInjector {
|
||||||
public void noRegistration() throws IOException { }
|
public void noRegistration() throws IOException { }
|
||||||
|
|
||||||
public void failMirrorConnection() throws IOException { }
|
public void failMirrorConnection() throws IOException { }
|
||||||
|
|
||||||
|
public void failPipeline(ReplicaInPipelineInterface replicaInfo,
|
||||||
|
String mirrorAddr) throws IOException { }
|
||||||
}
|
}
|
||||||
|
|
|
@ -1434,7 +1434,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
|
if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
|
||||||
throw new MustStopExistingWriter(rbw);
|
throw new MustStopExistingWriter(rbw);
|
||||||
}
|
}
|
||||||
LOG.info("Recovering " + rbw);
|
LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw);
|
||||||
return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
|
return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
|
||||||
}
|
}
|
||||||
} catch (MustStopExistingWriter e) {
|
} catch (MustStopExistingWriter e) {
|
||||||
|
|
|
@ -17,9 +17,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -31,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
|
@ -39,12 +48,15 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This tests pipeline recovery related client protocol works correct or not.
|
* This tests pipeline recovery related client protocol works correct or not.
|
||||||
*/
|
*/
|
||||||
public class TestClientProtocolForPipelineRecovery {
|
public class TestClientProtocolForPipelineRecovery {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestClientProtocolForPipelineRecovery.class);
|
||||||
@Test public void testGetNewStamp() throws IOException {
|
@Test public void testGetNewStamp() throws IOException {
|
||||||
int numDataNodes = 1;
|
int numDataNodes = 1;
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
@ -477,4 +489,128 @@ public class TestClientProtocolForPipelineRecovery {
|
||||||
DataNodeFaultInjector.set(oldDnInjector);
|
DataNodeFaultInjector.set(oldDnInjector);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test to verify that blocks are no longer corrupted after HDFS-4660.
|
||||||
|
// Revert HDFS-4660 and the other related ones (HDFS-9220, HDFS-8722), this
|
||||||
|
// test would fail.
|
||||||
|
// Scenario: Prior to the fix, block get corrupted when the transferBlock
|
||||||
|
// happens during pipeline recovery with extra bytes to make up the end of
|
||||||
|
// chunk.
|
||||||
|
// For verification, Need to fail the pipeline for last datanode when the
|
||||||
|
// second datanode have more bytes on disk than already acked bytes.
|
||||||
|
// This will enable to transfer extra bytes to the newNode to makeup
|
||||||
|
// end-of-chunk during pipeline recovery. This is achieved by the customized
|
||||||
|
// DataNodeFaultInjector class in this test.
|
||||||
|
// For detailed info, please refer to HDFS-4660 and HDFS-10587. HDFS-9220
|
||||||
|
// fixes an issue in HDFS-4660 patch, and HDFS-8722 is an optimization.
|
||||||
|
@Test
|
||||||
|
public void testPipelineRecoveryWithTransferBlock() throws Exception {
|
||||||
|
final int chunkSize = 512;
|
||||||
|
final int oneWriteSize = 5000;
|
||||||
|
final int totalSize = 1024 * 1024;
|
||||||
|
final int errorInjectionPos = 512;
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
// Need 4 datanodes to verify the replaceDatanode during pipeline recovery
|
||||||
|
final MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
||||||
|
DataNodeFaultInjector old = DataNodeFaultInjector.get();
|
||||||
|
|
||||||
|
try {
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
Path fileName = new Path("/f");
|
||||||
|
FSDataOutputStream o = fs.create(fileName);
|
||||||
|
int count = 0;
|
||||||
|
// Flush to get the pipeline created.
|
||||||
|
o.writeBytes("hello");
|
||||||
|
o.hflush();
|
||||||
|
DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream();
|
||||||
|
final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes();
|
||||||
|
final String lastDn = pipeline[2].getXferAddr(false);
|
||||||
|
final AtomicBoolean failed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public void failPipeline(ReplicaInPipelineInterface replicaInfo,
|
||||||
|
String mirror) throws IOException {
|
||||||
|
if (!lastDn.equals(mirror)) {
|
||||||
|
// Only fail for second DN
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!failed.get() &&
|
||||||
|
(replicaInfo.getBytesAcked() > errorInjectionPos) &&
|
||||||
|
(replicaInfo.getBytesAcked() % chunkSize != 0)) {
|
||||||
|
int count = 0;
|
||||||
|
while (count < 10) {
|
||||||
|
// Fail the pipeline (Throw exception) when:
|
||||||
|
// 1. bytsAcked is not at chunk boundary (checked in the if
|
||||||
|
// statement above)
|
||||||
|
// 2. bytesOnDisk is bigger than bytesAcked and at least
|
||||||
|
// reaches (or go beyond) the end of the chunk that
|
||||||
|
// bytesAcked is in (checked in the if statement below).
|
||||||
|
// At this condition, transferBlock that happens during
|
||||||
|
// pipeline recovery would transfer extra bytes to make up to the
|
||||||
|
// end of the chunk. And this is when the block corruption
|
||||||
|
// described in HDFS-4660 would occur.
|
||||||
|
if ((replicaInfo.getBytesOnDisk() / chunkSize) -
|
||||||
|
(replicaInfo.getBytesAcked() / chunkSize) >= 1) {
|
||||||
|
failed.set(true);
|
||||||
|
throw new IOException(
|
||||||
|
"Failing Pipeline " + replicaInfo.getBytesAcked() + " : "
|
||||||
|
+ replicaInfo.getBytesOnDisk());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(200);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Random r = new Random();
|
||||||
|
byte[] b = new byte[oneWriteSize];
|
||||||
|
while (count < totalSize) {
|
||||||
|
r.nextBytes(b);
|
||||||
|
o.write(b);
|
||||||
|
count += oneWriteSize;
|
||||||
|
o.hflush();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Expected a failure in the pipeline", failed.get());
|
||||||
|
DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes();
|
||||||
|
o.close();
|
||||||
|
// Trigger block report to NN
|
||||||
|
for (DataNode d: cluster.getDataNodes()) {
|
||||||
|
DataNodeTestUtils.triggerBlockReport(d);
|
||||||
|
}
|
||||||
|
// Read from the replaced datanode to verify the corruption. So shutdown
|
||||||
|
// all other nodes in the pipeline.
|
||||||
|
List<DatanodeInfo> pipelineList = Arrays.asList(pipeline);
|
||||||
|
DatanodeInfo newNode = null;
|
||||||
|
for (DatanodeInfo node : newNodes) {
|
||||||
|
if (!pipelineList.contains(node)) {
|
||||||
|
newNode = node;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Number of nodes in pipeline: {} newNode {}",
|
||||||
|
newNodes.length, newNode.getName());
|
||||||
|
// shutdown old 2 nodes
|
||||||
|
for (int i = 0; i < newNodes.length; i++) {
|
||||||
|
if (newNodes[i].getName().equals(newNode.getName())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("shutdown {}", newNodes[i].getName());
|
||||||
|
cluster.stopDataNode(newNodes[i].getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read should be successfull from only the newNode. There should not be
|
||||||
|
// any corruption reported.
|
||||||
|
DFSTestUtil.readFile(fs, fileName);
|
||||||
|
} finally {
|
||||||
|
DataNodeFaultInjector.set(old);
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue