From af2fb6a9090fa66bd97a3a2534f58e6e9023e70c Mon Sep 17 00:00:00 2001 From: Konstantin V Shvachko Date: Thu, 20 Jul 2017 14:47:25 -0700 Subject: [PATCH] HDFS-11472. Fix inconsistent replica size after a data pipeline failure. Contributed by Erik Krogen and Wei-Chiu Chuang. (cherry picked from commit 2a5a313539e211736fef12010918a60f9edad030) --- .../fsdataset/impl/FsDatasetImpl.java | 22 ++++++-- .../impl/FsDatasetImplTestUtils.java | 7 +++ .../fsdataset/impl/TestWriteToReplica.java | 52 ++++++++++++++++++- 3 files changed, 75 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 09071ea36bb..5a17bcd4c88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1564,16 +1564,30 @@ private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, minBytesRcvd + ", " + maxBytesRcvd + "]."); } + long bytesOnDisk = rbw.getBytesOnDisk(); + long blockDataLength = rbw.getBlockFile().length(); + if (bytesOnDisk != blockDataLength) { + LOG.info("Resetting bytesOnDisk to match blockDataLength (=" + + blockDataLength + ") for replica " + rbw); + bytesOnDisk = blockDataLength; + rbw.setLastChecksumAndDataLen(bytesOnDisk, null); + } + + if (bytesOnDisk < bytesAcked) { + throw new ReplicaNotFoundException("Found fewer bytesOnDisk than " + + "bytesAcked for replica " + rbw); + } + FsVolumeReference ref = rbw.getVolume().obtainReference(); try { // Truncate the potentially corrupt portion. // If the source was client and the last node in the pipeline was lost, // any corrupt data written after the acked length can go unnoticed. - if (numBytes > bytesAcked) { + if (bytesOnDisk > bytesAcked) { final File replicafile = rbw.getBlockFile(); truncateBlock( rbw.getVolume(), replicafile, rbw.getMetaFile(), - numBytes, bytesAcked); + bytesOnDisk, bytesAcked); rbw.setNumBytes(bytesAcked); rbw.setLastChecksumAndDataLen(bytesAcked, null); } @@ -2624,8 +2638,8 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map, //check replica bytes on disk. if (rip.getBytesOnDisk() < rip.getVisibleLength()) { - throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" - + " getBytesOnDisk() < getVisibleLength(), rip=" + rip); + throw new IOException("getBytesOnDisk() < getVisibleLength(), rip=" + + rip); } //check the replica's files diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index 803bcb16ca7..b8138c00c40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -303,6 +303,13 @@ public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb) rbw.getBlockFile().createNewFile(); rbw.getMetaFile().createNewFile(); dataset.volumeMap.add(bpid, rbw); + + try (RandomAccessFile blockRAF = + new RandomAccessFile(rbw.getBlockFile(), "rw")) { + //extend blockFile + blockRAF.setLength(eb.getNumBytes()); + } + saveMetaFileHeader(rbw.getMetaFile()); return rbw; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index 56f1d674676..e7c680cfb74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -36,12 +37,14 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.junit.Assert; @@ -154,7 +157,7 @@ private ExtendedBlock[] setup(String bpid, FsDatasetTestUtils testUtils) ExtendedBlock[] blocks = new ExtendedBlock[] { new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002), - new ExtendedBlock(bpid, 3, 1, 2003), new ExtendedBlock(bpid, 4, 1, 2004), + new ExtendedBlock(bpid, 3, 2, 2003), new ExtendedBlock(bpid, 4, 1, 2004), new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006) }; @@ -548,7 +551,52 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception { cluster.shutdown(); } } - + + /** + * Test that we can successfully recover a {@link ReplicaBeingWritten} + * which has inconsistent metadata (bytes were written to disk but bytesOnDisk + * was not updated) but that recovery fails when the block is actually + * corrupt (bytes are not present on disk). + */ + @Test + public void testRecoverInconsistentRbw() throws IOException { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetImpl fsDataset = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn); + + // set up replicasMap + String bpid = cluster.getNamesystem().getBlockPoolId(); + ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn)); + + ReplicaBeingWritten rbw = (ReplicaBeingWritten)fsDataset. + getReplicaInfo(blocks[RBW]); + long bytesOnDisk = rbw.getBytesOnDisk(); + // simulate an inconsistent replica length update by reducing in-memory + // value of on disk length + rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null); + fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L, + rbw.getNumBytes()); + // after the recovery, on disk length should equal acknowledged length. + Assert.assertTrue(rbw.getBytesOnDisk() == rbw.getBytesAcked()); + + // reduce on disk length again; this time actually truncate the file to + // simulate the data not being present + rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null); + try (RandomAccessFile blockRAF = + new RandomAccessFile(rbw.getBlockFile(), "rw")) { + // truncate blockFile + blockRAF.setLength(bytesOnDisk - 1); + fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L, + rbw.getNumBytes()); + Assert.fail("recovery should have failed"); + } catch (ReplicaNotFoundException rnfe) { + GenericTestUtils.assertExceptionContains("Found fewer bytesOnDisk than " + + "bytesAcked for replica", rnfe); + } + } + /** * Compare the replica map before and after the restart **/