HDFS-11472. Fix inconsistent replica size after a data pipeline failure. Contributed by Erik Krogen and Wei-Chiu Chuang.
(cherry picked from commit 2a5a313539e211736fef12010918a60f9edad030)
This commit is contained in:
parent
a735916b2c
commit
af2fb6a909
@ -1564,16 +1564,30 @@ private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
|
|||||||
minBytesRcvd + ", " + maxBytesRcvd + "].");
|
minBytesRcvd + ", " + maxBytesRcvd + "].");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long bytesOnDisk = rbw.getBytesOnDisk();
|
||||||
|
long blockDataLength = rbw.getBlockFile().length();
|
||||||
|
if (bytesOnDisk != blockDataLength) {
|
||||||
|
LOG.info("Resetting bytesOnDisk to match blockDataLength (=" +
|
||||||
|
blockDataLength + ") for replica " + rbw);
|
||||||
|
bytesOnDisk = blockDataLength;
|
||||||
|
rbw.setLastChecksumAndDataLen(bytesOnDisk, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bytesOnDisk < bytesAcked) {
|
||||||
|
throw new ReplicaNotFoundException("Found fewer bytesOnDisk than " +
|
||||||
|
"bytesAcked for replica " + rbw);
|
||||||
|
}
|
||||||
|
|
||||||
FsVolumeReference ref = rbw.getVolume().obtainReference();
|
FsVolumeReference ref = rbw.getVolume().obtainReference();
|
||||||
try {
|
try {
|
||||||
// Truncate the potentially corrupt portion.
|
// Truncate the potentially corrupt portion.
|
||||||
// If the source was client and the last node in the pipeline was lost,
|
// If the source was client and the last node in the pipeline was lost,
|
||||||
// any corrupt data written after the acked length can go unnoticed.
|
// any corrupt data written after the acked length can go unnoticed.
|
||||||
if (numBytes > bytesAcked) {
|
if (bytesOnDisk > bytesAcked) {
|
||||||
final File replicafile = rbw.getBlockFile();
|
final File replicafile = rbw.getBlockFile();
|
||||||
truncateBlock(
|
truncateBlock(
|
||||||
rbw.getVolume(), replicafile, rbw.getMetaFile(),
|
rbw.getVolume(), replicafile, rbw.getMetaFile(),
|
||||||
numBytes, bytesAcked);
|
bytesOnDisk, bytesAcked);
|
||||||
rbw.setNumBytes(bytesAcked);
|
rbw.setNumBytes(bytesAcked);
|
||||||
rbw.setLastChecksumAndDataLen(bytesAcked, null);
|
rbw.setLastChecksumAndDataLen(bytesAcked, null);
|
||||||
}
|
}
|
||||||
@ -2624,8 +2638,8 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
|
|||||||
|
|
||||||
//check replica bytes on disk.
|
//check replica bytes on disk.
|
||||||
if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
|
if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
|
||||||
throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
|
throw new IOException("getBytesOnDisk() < getVisibleLength(), rip="
|
||||||
+ " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
|
+ rip);
|
||||||
}
|
}
|
||||||
|
|
||||||
//check the replica's files
|
//check the replica's files
|
||||||
|
@ -303,6 +303,13 @@ public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb)
|
|||||||
rbw.getBlockFile().createNewFile();
|
rbw.getBlockFile().createNewFile();
|
||||||
rbw.getMetaFile().createNewFile();
|
rbw.getMetaFile().createNewFile();
|
||||||
dataset.volumeMap.add(bpid, rbw);
|
dataset.volumeMap.add(bpid, rbw);
|
||||||
|
|
||||||
|
try (RandomAccessFile blockRAF =
|
||||||
|
new RandomAccessFile(rbw.getBlockFile(), "rw")) {
|
||||||
|
//extend blockFile
|
||||||
|
blockRAF.setLength(eb.getNumBytes());
|
||||||
|
}
|
||||||
|
saveMetaFileHeader(rbw.getMetaFile());
|
||||||
return rbw;
|
return rbw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -36,12 +37,14 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -154,7 +157,7 @@ private ExtendedBlock[] setup(String bpid, FsDatasetTestUtils testUtils)
|
|||||||
|
|
||||||
ExtendedBlock[] blocks = new ExtendedBlock[] {
|
ExtendedBlock[] blocks = new ExtendedBlock[] {
|
||||||
new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002),
|
new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002),
|
||||||
new ExtendedBlock(bpid, 3, 1, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
|
new ExtendedBlock(bpid, 3, 2, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
|
||||||
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
|
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -549,6 +552,51 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that we can successfully recover a {@link ReplicaBeingWritten}
|
||||||
|
* which has inconsistent metadata (bytes were written to disk but bytesOnDisk
|
||||||
|
* was not updated) but that recovery fails when the block is actually
|
||||||
|
* corrupt (bytes are not present on disk).
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRecoverInconsistentRbw() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
FsDatasetImpl fsDataset = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||||
|
|
||||||
|
// set up replicasMap
|
||||||
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
|
||||||
|
|
||||||
|
ReplicaBeingWritten rbw = (ReplicaBeingWritten)fsDataset.
|
||||||
|
getReplicaInfo(blocks[RBW]);
|
||||||
|
long bytesOnDisk = rbw.getBytesOnDisk();
|
||||||
|
// simulate an inconsistent replica length update by reducing in-memory
|
||||||
|
// value of on disk length
|
||||||
|
rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
|
||||||
|
fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
|
||||||
|
rbw.getNumBytes());
|
||||||
|
// after the recovery, on disk length should equal acknowledged length.
|
||||||
|
Assert.assertTrue(rbw.getBytesOnDisk() == rbw.getBytesAcked());
|
||||||
|
|
||||||
|
// reduce on disk length again; this time actually truncate the file to
|
||||||
|
// simulate the data not being present
|
||||||
|
rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
|
||||||
|
try (RandomAccessFile blockRAF =
|
||||||
|
new RandomAccessFile(rbw.getBlockFile(), "rw")) {
|
||||||
|
// truncate blockFile
|
||||||
|
blockRAF.setLength(bytesOnDisk - 1);
|
||||||
|
fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
|
||||||
|
rbw.getNumBytes());
|
||||||
|
Assert.fail("recovery should have failed");
|
||||||
|
} catch (ReplicaNotFoundException rnfe) {
|
||||||
|
GenericTestUtils.assertExceptionContains("Found fewer bytesOnDisk than " +
|
||||||
|
"bytesAcked for replica", rnfe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compare the replica map before and after the restart
|
* Compare the replica map before and after the restart
|
||||||
**/
|
**/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user