HDFS-16286. Add a debug tool to verify the correctness of erasure coding on file (#3593)

This commit is contained in:
daimin 2021-11-04 04:19:56 +08:00 committed by GitHub
parent 6c6d1b64d4
commit a21895a5b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 350 additions and 8 deletions

View File

@ -24,15 +24,41 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -69,6 +95,7 @@ public class DebugAdmin extends Configured implements Tool {
new VerifyMetaCommand(), new VerifyMetaCommand(),
new ComputeMetaCommand(), new ComputeMetaCommand(),
new RecoverLeaseCommand(), new RecoverLeaseCommand(),
new VerifyECCommand(),
new HelpCommand() new HelpCommand()
}; };
@ -387,6 +414,209 @@ public class DebugAdmin extends Configured implements Tool {
} }
} }
/**
* The command for verifying the correctness of erasure coding on an erasure coded file.
*/
private class VerifyECCommand extends DebugCommand {
private DFSClient client;
private int dataBlkNum;
private int parityBlkNum;
private int cellSize;
private boolean useDNHostname;
private CachingStrategy cachingStrategy;
private int stripedReadBufferSize;
private CompletionService<Integer> readService;
private RawErasureEncoder encoder;
private BlockReader[] blockReaders;
VerifyECCommand() {
super("verifyEC",
"verifyEC -file <file>",
" Verify HDFS erasure coding on all block groups of the file.");
}
int run(List<String> args) throws IOException {
if (args.size() < 2) {
System.out.println(usageText);
System.out.println(helpText + System.lineSeparator());
return 1;
}
String file = StringUtils.popOptionWithArgument("-file", args);
Path path = new Path(file);
DistributedFileSystem dfs = AdminHelper.getDFS(getConf());
this.client = dfs.getClient();
FileStatus fileStatus;
try {
fileStatus = dfs.getFileStatus(path);
} catch (FileNotFoundException e) {
System.err.println("File " + file + " does not exist.");
return 1;
}
if (!fileStatus.isFile()) {
System.err.println("File " + file + " is not a regular file.");
return 1;
}
if (!dfs.isFileClosed(path)) {
System.err.println("File " + file + " is not closed.");
return 1;
}
this.useDNHostname = getConf().getBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.cachingStrategy = CachingStrategy.newDefaultStrategy();
this.stripedReadBufferSize = getConf().getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT);
LocatedBlocks locatedBlocks = client.getLocatedBlocks(file, 0, fileStatus.getLen());
if (locatedBlocks.getErasureCodingPolicy() == null) {
System.err.println("File " + file + " is not erasure coded.");
return 1;
}
ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy();
this.dataBlkNum = ecPolicy.getNumDataUnits();
this.parityBlkNum = ecPolicy.getNumParityUnits();
this.cellSize = ecPolicy.getCellSize();
this.encoder = CodecUtil.createRawEncoder(getConf(), ecPolicy.getCodecName(),
new ErasureCoderOptions(
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()));
int blockNum = dataBlkNum + parityBlkNum;
this.readService = new ExecutorCompletionService<>(
DFSUtilClient.getThreadPoolExecutor(blockNum, blockNum, 60,
new LinkedBlockingQueue<>(), "read-", false));
this.blockReaders = new BlockReader[dataBlkNum + parityBlkNum];
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
System.out.println("Checking EC block group: blk_" + locatedBlock.getBlock().getBlockId());
LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
try {
verifyBlockGroup(blockGroup);
System.out.println("Status: OK");
} catch (Exception e) {
System.err.println("Status: ERROR, message: " + e.getMessage());
return 1;
} finally {
closeBlockReaders();
}
}
System.out.println("\nAll EC block group status: OK");
return 0;
}
private void verifyBlockGroup(LocatedStripedBlock blockGroup) throws Exception {
final LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup,
cellSize, dataBlkNum, parityBlkNum);
int blockNumExpected = Math.min(dataBlkNum,
(int) ((blockGroup.getBlockSize() - 1) / cellSize + 1)) + parityBlkNum;
if (blockGroup.getBlockIndices().length < blockNumExpected) {
throw new Exception("Block group is under-erasure-coded.");
}
long maxBlockLen = 0L;
DataChecksum checksum = null;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
LocatedBlock block = indexedBlocks[i];
if (block == null) {
blockReaders[i] = null;
continue;
}
if (block.getBlockSize() > maxBlockLen) {
maxBlockLen = block.getBlockSize();
}
BlockReader blockReader = createBlockReader(block.getBlock(),
block.getLocations()[0], block.getBlockToken());
if (checksum == null) {
checksum = blockReader.getDataChecksum();
} else {
assert checksum.equals(blockReader.getDataChecksum());
}
blockReaders[i] = blockReader;
}
assert checksum != null;
int bytesPerChecksum = checksum.getBytesPerChecksum();
int bufferSize = stripedReadBufferSize < bytesPerChecksum ? bytesPerChecksum :
stripedReadBufferSize - stripedReadBufferSize % bytesPerChecksum;
final ByteBuffer[] buffers = new ByteBuffer[dataBlkNum + parityBlkNum];
final ByteBuffer[] outputs = new ByteBuffer[parityBlkNum];
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
buffers[i] = ByteBuffer.allocate(bufferSize);
}
for (int i = 0; i < parityBlkNum; i++) {
outputs[i] = ByteBuffer.allocate(bufferSize);
}
long positionInBlock = 0L;
while (positionInBlock < maxBlockLen) {
final int toVerifyLen = (int) Math.min(bufferSize, maxBlockLen - positionInBlock);
List<Future<Integer>> futures = new ArrayList<>(dataBlkNum + parityBlkNum);
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
final int fi = i;
futures.add(this.readService.submit(() -> {
BlockReader blockReader = blockReaders[fi];
ByteBuffer buffer = buffers[fi];
buffer.clear();
buffer.limit(toVerifyLen);
int readLen = 0;
if (blockReader != null) {
int toRead = buffer.remaining();
while (readLen < toRead) {
int nread = blockReader.read(buffer);
if (nread <= 0) {
break;
}
readLen += nread;
}
}
while (buffer.hasRemaining()) {
buffer.put((byte) 0);
}
buffer.flip();
return readLen;
}));
}
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
futures.get(i).get(1, TimeUnit.MINUTES);
}
ByteBuffer[] inputs = new ByteBuffer[dataBlkNum];
System.arraycopy(buffers, 0, inputs, 0, dataBlkNum);
for (int i = 0; i < parityBlkNum; i++) {
outputs[i].clear();
outputs[i].limit(toVerifyLen);
}
this.encoder.encode(inputs, outputs);
for (int i = 0; i < parityBlkNum; i++) {
if (!buffers[dataBlkNum + i].equals(outputs[i])) {
throw new Exception("EC compute result not match.");
}
}
positionInBlock += toVerifyLen;
}
}
private BlockReader createBlockReader(ExtendedBlock block, DatanodeInfo dnInfo,
Token<BlockTokenIdentifier> token) throws IOException {
InetSocketAddress dnAddress = NetUtils.createSocketAddr(dnInfo.getXferAddr(useDNHostname));
Peer peer = client.newConnectedPeer(dnAddress, token, dnInfo);
return BlockReaderRemote.newBlockReader(
"dummy", block, token, 0,
block.getNumBytes(), true, "", peer, dnInfo,
null, cachingStrategy, -1, getConf());
}
private void closeBlockReaders() {
for (int i = 0; i < blockReaders.length; i++) {
if (blockReaders[i] != null) {
IOUtils.closeStream(blockReaders[i]);
blockReaders[i] = null;
}
}
}
}
/** /**
* The command for getting help about other commands. * The command for getting help about other commands.
*/ */

View File

@ -708,6 +708,16 @@ Usage: `hdfs debug recoverLease -path <path> [-retries <num-retries>]`
Recover the lease on the specified path. The path must reside on an HDFS file system. The default number of retries is 1. Recover the lease on the specified path. The path must reside on an HDFS file system. The default number of retries is 1.
### `verifyEC`
Usage: `hdfs debug verifyEC -file <file>`
| COMMAND\_OPTION | Description |
|:---- |:---- |
| [`-file` *EC-file*] | HDFS EC file to be verified. |
Verify the correctness of erasure coding on an erasure coded file.
dfsadmin with ViewFsOverloadScheme dfsadmin with ViewFsOverloadScheme
---------------------------------- ----------------------------------

View File

@ -17,15 +17,22 @@
*/ */
package org.apache.hadoop.hdfs.tools; package org.apache.hadoop.hdfs.tools;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -34,6 +41,8 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.List;
import java.util.Random;
import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil.*; import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil.*;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -44,23 +53,16 @@ public class TestDebugAdmin {
static private final String TEST_ROOT_DIR = static private final String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp"), new File(System.getProperty("test.build.data", "/tmp"),
TestDebugAdmin.class.getSimpleName()).getAbsolutePath(); TestDebugAdmin.class.getSimpleName()).getAbsolutePath();
private Configuration conf = new Configuration();
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DebugAdmin admin; private DebugAdmin admin;
private DataNode datanode;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
final File testRoot = new File(TEST_ROOT_DIR); final File testRoot = new File(TEST_ROOT_DIR);
testRoot.delete(); testRoot.delete();
testRoot.mkdirs(); testRoot.mkdirs();
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
admin = new DebugAdmin(conf); admin = new DebugAdmin(conf);
datanode = cluster.getDataNodes().get(0);
} }
@After @After
@ -92,8 +94,11 @@ public class TestDebugAdmin {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testRecoverLease() throws Exception { public void testRecoverLease() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
assertEquals("ret: 1, You must supply a -path argument to recoverLease.", assertEquals("ret: 1, You must supply a -path argument to recoverLease.",
runCmd(new String[]{"recoverLease", "-retries", "1"})); runCmd(new String[]{"recoverLease", "-retries", "1"}));
DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(new Path("/foo")); FSDataOutputStream out = fs.create(new Path("/foo"));
out.write(123); out.write(123);
out.close(); out.close();
@ -103,6 +108,10 @@ public class TestDebugAdmin {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testVerifyMetaCommand() throws Exception { public void testVerifyMetaCommand() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
DataNode datanode = cluster.getDataNodes().get(0);
DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef); DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef);
FsDatasetSpi<?> fsd = datanode.getFSDataset(); FsDatasetSpi<?> fsd = datanode.getFSDataset();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar")); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar"));
@ -128,6 +137,10 @@ public class TestDebugAdmin {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testComputeMetaCommand() throws Exception { public void testComputeMetaCommand() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
DataNode datanode = cluster.getDataNodes().get(0);
DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef); DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef);
FsDatasetSpi<?> fsd = datanode.getFSDataset(); FsDatasetSpi<?> fsd = datanode.getFSDataset();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar")); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar"));
@ -166,8 +179,97 @@ public class TestDebugAdmin {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testRecoverLeaseforFileNotFound() throws Exception { public void testRecoverLeaseforFileNotFound() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
assertTrue(runCmd(new String[] { assertTrue(runCmd(new String[] {
"recoverLease", "-path", "/foo", "-retries", "2" }).contains( "recoverLease", "-path", "/foo", "-retries", "2" }).contains(
"Giving up on recoverLease for /foo after 1 try")); "Giving up on recoverLease for /foo after 1 try"));
} }
@Test(timeout = 60000)
public void testVerifyECCommand() throws Exception {
final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByID(
SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
cluster = DFSTestUtil.setupCluster(conf, 6, 5, 0);
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
assertEquals("ret: 1, verifyEC -file <file> Verify HDFS erasure coding on " +
"all block groups of the file.", runCmd(new String[]{"verifyEC"}));
assertEquals("ret: 1, File /bar does not exist.",
runCmd(new String[]{"verifyEC", "-file", "/bar"}));
fs.create(new Path("/bar")).close();
assertEquals("ret: 1, File /bar is not erasure coded.",
runCmd(new String[]{"verifyEC", "-file", "/bar"}));
final Path ecDir = new Path("/ec");
fs.mkdir(ecDir, FsPermission.getDirDefault());
fs.enableErasureCodingPolicy(ecPolicy.getName());
fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
assertEquals("ret: 1, File /ec is not a regular file.",
runCmd(new String[]{"verifyEC", "-file", "/ec"}));
fs.create(new Path(ecDir, "foo"));
assertEquals("ret: 1, File /ec/foo is not closed.",
runCmd(new String[]{"verifyEC", "-file", "/ec/foo"}));
final short repl = 1;
final long k = 1024;
final long m = k * k;
final long seed = 0x1234567L;
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_65535"), 65535, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_65535"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_256k"), 256 * k, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_256k"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_1m"), m, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_1m"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_2m"), 2 * m, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_2m"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_3m"), 3 * m, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_3m"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_5m"), 5 * m, repl, seed);
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_5m"})
.contains("All EC block group status: OK"));
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_6m"), (int) k, 6 * m, m, repl, seed);
assertEquals("ret: 0, Checking EC block group: blk_x;Status: OK" +
"Checking EC block group: blk_x;Status: OK" +
"All EC block group status: OK",
runCmd(new String[]{"verifyEC", "-file", "/ec/foo_6m"})
.replaceAll("blk_-[0-9]+", "blk_x;"));
Path corruptFile = new Path(ecDir, "foo_corrupt");
DFSTestUtil.createFile(fs, corruptFile, 5841961, repl, seed);
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs, corruptFile);
assertEquals(1, blocks.size());
LocatedStripedBlock blockGroup = (LocatedStripedBlock) blocks.get(0);
LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup,
ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
// Try corrupt block 0 in block group.
LocatedBlock toCorruptLocatedBlock = indexedBlocks[0];
ExtendedBlock toCorruptBlock = toCorruptLocatedBlock.getBlock();
DataNode datanode = cluster.getDataNode(toCorruptLocatedBlock.getLocations()[0].getIpcPort());
File blockFile = getBlockFile(datanode.getFSDataset(),
toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock());
File metaFile = getMetaFile(datanode.getFSDataset(),
toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock());
// Write error bytes to block file and re-generate meta checksum.
byte[] errorBytes = new byte[2097152];
new Random(seed).nextBytes(errorBytes);
FileUtils.writeByteArrayToFile(blockFile, errorBytes);
metaFile.delete();
runCmd(new String[]{"computeMeta", "-block", blockFile.getAbsolutePath(),
"-out", metaFile.getAbsolutePath()});
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_corrupt"})
.contains("Status: ERROR, message: EC compute result not match."));
}
} }