HDFS-9308. Add truncateMeta() and deleteMeta() to MiniDFSCluster. (Tony Wu via lei)
This commit is contained in:
parent
5ba2b98d0f
commit
8e05dbf2bd
|
@ -1661,6 +1661,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-9312. Fix TestReplication to be FsDataset-agnostic. (lei)
|
HDFS-9312. Fix TestReplication to be FsDataset-agnostic. (lei)
|
||||||
|
|
||||||
|
HDFS-9308. Add truncateMeta() and deleteMeta() to MiniDFSCluster. (Tony Wu via lei)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
||||||
|
|
|
@ -2117,6 +2117,28 @@ public class MiniDFSCluster {
|
||||||
getMaterializedReplica(i, blk).corruptMeta();
|
getMaterializedReplica(i, blk).corruptMeta();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Corrupt the metadata of a block by deleting it.
|
||||||
|
* @param i index of the datanode
|
||||||
|
* @param blk name of the block.
|
||||||
|
*/
|
||||||
|
public void deleteMeta(int i, ExtendedBlock blk)
|
||||||
|
throws IOException {
|
||||||
|
getMaterializedReplica(i, blk).deleteMeta();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Corrupt the metadata of a block by truncating it to a new size.
|
||||||
|
* @param i index of the datanode.
|
||||||
|
* @param blk name of the block.
|
||||||
|
* @param newSize the new size of the metadata file.
|
||||||
|
* @throws IOException if any I/O errors.
|
||||||
|
*/
|
||||||
|
public void truncateMeta(int i, ExtendedBlock blk, int newSize)
|
||||||
|
throws IOException {
|
||||||
|
getMaterializedReplica(i, blk).truncateMeta(newSize);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
|
public boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
|
||||||
long newGenStamp) throws IOException {
|
long newGenStamp) throws IOException {
|
||||||
File blockFile = getBlockFile(dnIndex, blk);
|
File blockFile = getBlockFile(dnIndex, blk);
|
||||||
|
|
|
@ -22,11 +22,8 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.util.List;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.channels.FileChannel;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -35,12 +32,15 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A JUnit test for corrupted file handling.
|
* A JUnit test for corrupted file handling.
|
||||||
|
@ -70,6 +70,8 @@ import org.mockito.Mockito;
|
||||||
* replica was created from the non-corrupted replica.
|
* replica was created from the non-corrupted replica.
|
||||||
*/
|
*/
|
||||||
public class TestCrcCorruption {
|
public class TestCrcCorruption {
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestCrcCorruption.class);
|
||||||
|
|
||||||
private DFSClientFaultInjector faultInjector;
|
private DFSClientFaultInjector faultInjector;
|
||||||
|
|
||||||
|
@ -167,90 +169,26 @@ public class TestCrcCorruption {
|
||||||
// file disallows this Datanode to send data to another datanode.
|
// file disallows this Datanode to send data to another datanode.
|
||||||
// However, a client is alowed access to this block.
|
// However, a client is alowed access to this block.
|
||||||
//
|
//
|
||||||
File storageDir = cluster.getInstanceStorageDir(0, 1);
|
final int dnIdx = 0;
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
final DataNode dn = cluster.getDataNodes().get(dnIdx);
|
||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
assertTrue("data directory does not exist", data_dir.exists());
|
List<FinalizedReplica> replicas =
|
||||||
File[] blocks = data_dir.listFiles();
|
dn.getFSDataset().getFinalizedBlocks(bpid);
|
||||||
assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
|
assertTrue("Replicas do not exist", !replicas.isEmpty());
|
||||||
int num = 0;
|
|
||||||
for (int idx = 0; idx < blocks.length; idx++) {
|
|
||||||
if (blocks[idx].getName().startsWith(Block.BLOCK_FILE_PREFIX) &&
|
|
||||||
blocks[idx].getName().endsWith(".meta")) {
|
|
||||||
num++;
|
|
||||||
if (num % 3 == 0) {
|
|
||||||
//
|
|
||||||
// remove .meta file
|
|
||||||
//
|
|
||||||
System.out.println("Deliberately removing file " + blocks[idx].getName());
|
|
||||||
assertTrue("Cannot remove file.", blocks[idx].delete());
|
|
||||||
} else if (num % 3 == 1) {
|
|
||||||
//
|
|
||||||
// shorten .meta file
|
|
||||||
//
|
|
||||||
RandomAccessFile file = new RandomAccessFile(blocks[idx], "rw");
|
|
||||||
FileChannel channel = file.getChannel();
|
|
||||||
int newsize = random.nextInt((int)channel.size()/2);
|
|
||||||
System.out.println("Deliberately truncating file " +
|
|
||||||
blocks[idx].getName() +
|
|
||||||
" to size " + newsize + " bytes.");
|
|
||||||
channel.truncate(newsize);
|
|
||||||
file.close();
|
|
||||||
} else {
|
|
||||||
//
|
|
||||||
// corrupt a few bytes of the metafile
|
|
||||||
//
|
|
||||||
RandomAccessFile file = new RandomAccessFile(blocks[idx], "rw");
|
|
||||||
FileChannel channel = file.getChannel();
|
|
||||||
long position = 0;
|
|
||||||
//
|
|
||||||
// The very first time, corrupt the meta header at offset 0
|
|
||||||
//
|
|
||||||
if (num != 2) {
|
|
||||||
position = (long)random.nextInt((int)channel.size());
|
|
||||||
}
|
|
||||||
int length = random.nextInt((int)(channel.size() - position + 1));
|
|
||||||
byte[] buffer = new byte[length];
|
|
||||||
random.nextBytes(buffer);
|
|
||||||
channel.write(ByteBuffer.wrap(buffer), position);
|
|
||||||
System.out.println("Deliberately corrupting file " +
|
|
||||||
blocks[idx].getName() +
|
|
||||||
" at offset " + position +
|
|
||||||
" length " + length);
|
|
||||||
file.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Now deliberately corrupt all meta blocks from the second
|
|
||||||
// directory of the first datanode
|
|
||||||
//
|
|
||||||
storageDir = cluster.getInstanceStorageDir(0, 1);
|
|
||||||
data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
|
||||||
assertTrue("data directory does not exist", data_dir.exists());
|
|
||||||
blocks = data_dir.listFiles();
|
|
||||||
assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
|
|
||||||
|
|
||||||
int count = 0;
|
for (int idx = 0; idx < replicas.size(); idx++) {
|
||||||
File previous = null;
|
FinalizedReplica replica = replicas.get(idx);
|
||||||
for (int idx = 0; idx < blocks.length; idx++) {
|
ExtendedBlock eb = new ExtendedBlock(bpid, replica);
|
||||||
if (blocks[idx].getName().startsWith("blk_") &&
|
if (idx % 3 == 0) {
|
||||||
blocks[idx].getName().endsWith(".meta")) {
|
LOG.info("Deliberately removing meta for block " + eb);
|
||||||
//
|
cluster.deleteMeta(dnIdx, eb);
|
||||||
// Move the previous metafile into the current one.
|
} else if (idx % 3 == 1) {
|
||||||
//
|
final int newSize = 2; // bytes
|
||||||
count++;
|
LOG.info("Deliberately truncating meta file for block " +
|
||||||
if (count % 2 == 0) {
|
eb + " to size " + newSize + " bytes.");
|
||||||
System.out.println("Deliberately insertimg bad crc into files " +
|
cluster.truncateMeta(dnIdx, eb, newSize);
|
||||||
blocks[idx].getName() + " " + previous.getName());
|
} else {
|
||||||
assertTrue("Cannot remove file.", blocks[idx].delete());
|
cluster.corruptMeta(dnIdx, eb);
|
||||||
assertTrue("Cannot corrupt meta file.", previous.renameTo(blocks[idx]));
|
|
||||||
assertTrue("Cannot recreate empty meta file.", previous.createNewFile());
|
|
||||||
previous = null;
|
|
||||||
} else {
|
|
||||||
previous = blocks[idx];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,7 +198,7 @@ public class TestCrcCorruption {
|
||||||
//
|
//
|
||||||
assertTrue("Corrupted replicas not handled properly.",
|
assertTrue("Corrupted replicas not handled properly.",
|
||||||
util.checkFiles(fs, "/srcdat"));
|
util.checkFiles(fs, "/srcdat"));
|
||||||
System.out.println("All File still have a valid replica");
|
LOG.info("All File still have a valid replica");
|
||||||
|
|
||||||
//
|
//
|
||||||
// set replication factor back to 1. This causes only one replica of
|
// set replication factor back to 1. This causes only one replica of
|
||||||
|
@ -273,7 +211,7 @@ public class TestCrcCorruption {
|
||||||
//System.out.println("All Files done with removing replicas");
|
//System.out.println("All Files done with removing replicas");
|
||||||
//assertTrue("Excess replicas deleted. Corrupted replicas found.",
|
//assertTrue("Excess replicas deleted. Corrupted replicas found.",
|
||||||
// util.checkFiles(fs, "/srcdat"));
|
// util.checkFiles(fs, "/srcdat"));
|
||||||
System.out.println("The excess-corrupted-replica test is disabled " +
|
LOG.info("The excess-corrupted-replica test is disabled " +
|
||||||
" pending HADOOP-1557");
|
" pending HADOOP-1557");
|
||||||
|
|
||||||
util.cleanup(fs, "/srcdat");
|
util.cleanup(fs, "/srcdat");
|
||||||
|
|
|
@ -20,9 +20,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -32,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
@ -178,8 +175,9 @@ public class TestLeaseRecovery {
|
||||||
Path file = new Path("/testRecoveryFile");
|
Path file = new Path("/testRecoveryFile");
|
||||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
FSDataOutputStream out = dfs.create(file);
|
FSDataOutputStream out = dfs.create(file);
|
||||||
|
final int FILE_SIZE = 2 * 1024 * 1024;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while (count < 2 * 1024 * 1024) {
|
while (count < FILE_SIZE) {
|
||||||
out.writeBytes("Data");
|
out.writeBytes("Data");
|
||||||
count += 4;
|
count += 4;
|
||||||
}
|
}
|
||||||
|
@ -190,15 +188,23 @@ public class TestLeaseRecovery {
|
||||||
LocatedBlocks locations = cluster.getNameNodeRpc().getBlockLocations(
|
LocatedBlocks locations = cluster.getNameNodeRpc().getBlockLocations(
|
||||||
file.toString(), 0, count);
|
file.toString(), 0, count);
|
||||||
ExtendedBlock block = locations.get(0).getBlock();
|
ExtendedBlock block = locations.get(0).getBlock();
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
|
||||||
BlockLocalPathInfo localPathInfo = dn.getBlockLocalPathInfo(block, null);
|
|
||||||
File metafile = new File(localPathInfo.getMetaPath());
|
|
||||||
assertTrue(metafile.exists());
|
|
||||||
|
|
||||||
// reduce the block meta file size
|
// Calculate meta file size
|
||||||
RandomAccessFile raf = new RandomAccessFile(metafile, "rw");
|
// From DataNode.java, checksum size is given by:
|
||||||
raf.setLength(metafile.length() - 20);
|
// (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
|
||||||
raf.close();
|
// CHECKSUM_SIZE
|
||||||
|
final int CHECKSUM_SIZE = 4; // CRC32 & CRC32C
|
||||||
|
final int bytesPerChecksum = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
||||||
|
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
|
||||||
|
final int metaFileSize =
|
||||||
|
(FILE_SIZE + bytesPerChecksum - 1) / bytesPerChecksum * CHECKSUM_SIZE +
|
||||||
|
8; // meta file header is 8 bytes
|
||||||
|
final int newMetaFileSize = metaFileSize - CHECKSUM_SIZE;
|
||||||
|
|
||||||
|
// Corrupt the block meta file by dropping checksum for bytesPerChecksum
|
||||||
|
// bytes. Lease recovery is expected to recover the uncorrupted file length.
|
||||||
|
cluster.truncateMeta(0, block, newMetaFileSize);
|
||||||
|
|
||||||
// restart DN to make replica to RWR
|
// restart DN to make replica to RWR
|
||||||
DataNodeProperties dnProp = cluster.stopDataNode(0);
|
DataNodeProperties dnProp = cluster.stopDataNode(0);
|
||||||
|
@ -213,6 +219,11 @@ public class TestLeaseRecovery {
|
||||||
}
|
}
|
||||||
assertTrue("File should be closed", newdfs.recoverLease(file));
|
assertTrue("File should be closed", newdfs.recoverLease(file));
|
||||||
|
|
||||||
|
// Verify file length after lease recovery. The new file length should not
|
||||||
|
// include the bytes with corrupted checksum.
|
||||||
|
final long expectedNewFileLen = FILE_SIZE - bytesPerChecksum;
|
||||||
|
final long newFileLen = newdfs.getFileStatus(file).getLen();
|
||||||
|
assertEquals(newFileLen, expectedNewFileLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue