HDFS-16286. Add a debug tool to verify the correctness of erasure coding on file (#3593)
(cherry picked from commit a21895a5b3
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java
This commit is contained in:
parent
b1801e99de
commit
29fd36e2f1
|
@ -25,15 +25,42 @@ import java.io.FileOutputStream;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.hdfs.BlockReader;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -68,6 +95,7 @@ public class DebugAdmin extends Configured implements Tool {
|
|||
new VerifyMetaCommand(),
|
||||
new ComputeMetaCommand(),
|
||||
new RecoverLeaseCommand(),
|
||||
new VerifyECCommand(),
|
||||
new HelpCommand()
|
||||
};
|
||||
|
||||
|
@ -386,6 +414,209 @@ public class DebugAdmin extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The command for verifying the correctness of erasure coding on an erasure coded file.
|
||||
*/
|
||||
private class VerifyECCommand extends DebugCommand {
|
||||
private DFSClient client;
|
||||
private int dataBlkNum;
|
||||
private int parityBlkNum;
|
||||
private int cellSize;
|
||||
private boolean useDNHostname;
|
||||
private CachingStrategy cachingStrategy;
|
||||
private int stripedReadBufferSize;
|
||||
private CompletionService<Integer> readService;
|
||||
private RawErasureEncoder encoder;
|
||||
private BlockReader[] blockReaders;
|
||||
|
||||
|
||||
VerifyECCommand() {
|
||||
super("verifyEC",
|
||||
"verifyEC -file <file>",
|
||||
" Verify HDFS erasure coding on all block groups of the file.");
|
||||
}
|
||||
|
||||
int run(List<String> args) throws IOException {
|
||||
if (args.size() < 2) {
|
||||
System.out.println(usageText);
|
||||
System.out.println(helpText + System.lineSeparator());
|
||||
return 1;
|
||||
}
|
||||
String file = StringUtils.popOptionWithArgument("-file", args);
|
||||
Path path = new Path(file);
|
||||
DistributedFileSystem dfs = AdminHelper.getDFS(getConf());
|
||||
this.client = dfs.getClient();
|
||||
|
||||
FileStatus fileStatus;
|
||||
try {
|
||||
fileStatus = dfs.getFileStatus(path);
|
||||
} catch (FileNotFoundException e) {
|
||||
System.err.println("File " + file + " does not exist.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!fileStatus.isFile()) {
|
||||
System.err.println("File " + file + " is not a regular file.");
|
||||
return 1;
|
||||
}
|
||||
if (!dfs.isFileClosed(path)) {
|
||||
System.err.println("File " + file + " is not closed.");
|
||||
return 1;
|
||||
}
|
||||
this.useDNHostname = getConf().getBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
|
||||
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
|
||||
this.cachingStrategy = CachingStrategy.newDefaultStrategy();
|
||||
this.stripedReadBufferSize = getConf().getInt(
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
|
||||
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT);
|
||||
|
||||
LocatedBlocks locatedBlocks = client.getLocatedBlocks(file, 0, fileStatus.getLen());
|
||||
if (locatedBlocks.getErasureCodingPolicy() == null) {
|
||||
System.err.println("File " + file + " is not erasure coded.");
|
||||
return 1;
|
||||
}
|
||||
ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy();
|
||||
this.dataBlkNum = ecPolicy.getNumDataUnits();
|
||||
this.parityBlkNum = ecPolicy.getNumParityUnits();
|
||||
this.cellSize = ecPolicy.getCellSize();
|
||||
this.encoder = CodecUtil.createRawEncoder(getConf(), ecPolicy.getCodecName(),
|
||||
new ErasureCoderOptions(
|
||||
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()));
|
||||
int blockNum = dataBlkNum + parityBlkNum;
|
||||
this.readService = new ExecutorCompletionService<>(
|
||||
DFSUtilClient.getThreadPoolExecutor(blockNum, blockNum, 60,
|
||||
new LinkedBlockingQueue<>(), "read-", false));
|
||||
this.blockReaders = new BlockReader[dataBlkNum + parityBlkNum];
|
||||
|
||||
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||
System.out.println("Checking EC block group: blk_" + locatedBlock.getBlock().getBlockId());
|
||||
LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
|
||||
|
||||
try {
|
||||
verifyBlockGroup(blockGroup);
|
||||
System.out.println("Status: OK");
|
||||
} catch (Exception e) {
|
||||
System.err.println("Status: ERROR, message: " + e.getMessage());
|
||||
return 1;
|
||||
} finally {
|
||||
closeBlockReaders();
|
||||
}
|
||||
}
|
||||
System.out.println("\nAll EC block group status: OK");
|
||||
return 0;
|
||||
}
|
||||
|
||||
private void verifyBlockGroup(LocatedStripedBlock blockGroup) throws Exception {
|
||||
final LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup,
|
||||
cellSize, dataBlkNum, parityBlkNum);
|
||||
|
||||
int blockNumExpected = Math.min(dataBlkNum,
|
||||
(int) ((blockGroup.getBlockSize() - 1) / cellSize + 1)) + parityBlkNum;
|
||||
if (blockGroup.getBlockIndices().length < blockNumExpected) {
|
||||
throw new Exception("Block group is under-erasure-coded.");
|
||||
}
|
||||
|
||||
long maxBlockLen = 0L;
|
||||
DataChecksum checksum = null;
|
||||
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
|
||||
LocatedBlock block = indexedBlocks[i];
|
||||
if (block == null) {
|
||||
blockReaders[i] = null;
|
||||
continue;
|
||||
}
|
||||
if (block.getBlockSize() > maxBlockLen) {
|
||||
maxBlockLen = block.getBlockSize();
|
||||
}
|
||||
BlockReader blockReader = createBlockReader(block.getBlock(),
|
||||
block.getLocations()[0], block.getBlockToken());
|
||||
if (checksum == null) {
|
||||
checksum = blockReader.getDataChecksum();
|
||||
} else {
|
||||
assert checksum.equals(blockReader.getDataChecksum());
|
||||
}
|
||||
blockReaders[i] = blockReader;
|
||||
}
|
||||
assert checksum != null;
|
||||
int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
int bufferSize = stripedReadBufferSize < bytesPerChecksum ? bytesPerChecksum :
|
||||
stripedReadBufferSize - stripedReadBufferSize % bytesPerChecksum;
|
||||
final ByteBuffer[] buffers = new ByteBuffer[dataBlkNum + parityBlkNum];
|
||||
final ByteBuffer[] outputs = new ByteBuffer[parityBlkNum];
|
||||
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
|
||||
buffers[i] = ByteBuffer.allocate(bufferSize);
|
||||
}
|
||||
for (int i = 0; i < parityBlkNum; i++) {
|
||||
outputs[i] = ByteBuffer.allocate(bufferSize);
|
||||
}
|
||||
long positionInBlock = 0L;
|
||||
while (positionInBlock < maxBlockLen) {
|
||||
final int toVerifyLen = (int) Math.min(bufferSize, maxBlockLen - positionInBlock);
|
||||
List<Future<Integer>> futures = new ArrayList<>(dataBlkNum + parityBlkNum);
|
||||
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
|
||||
final int fi = i;
|
||||
futures.add(this.readService.submit(() -> {
|
||||
BlockReader blockReader = blockReaders[fi];
|
||||
ByteBuffer buffer = buffers[fi];
|
||||
buffer.clear();
|
||||
buffer.limit(toVerifyLen);
|
||||
int readLen = 0;
|
||||
if (blockReader != null) {
|
||||
int toRead = buffer.remaining();
|
||||
while (readLen < toRead) {
|
||||
int nread = blockReader.read(buffer);
|
||||
if (nread <= 0) {
|
||||
break;
|
||||
}
|
||||
readLen += nread;
|
||||
}
|
||||
}
|
||||
while (buffer.hasRemaining()) {
|
||||
buffer.put((byte) 0);
|
||||
}
|
||||
buffer.flip();
|
||||
return readLen;
|
||||
}));
|
||||
}
|
||||
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
|
||||
futures.get(i).get(1, TimeUnit.MINUTES);
|
||||
}
|
||||
ByteBuffer[] inputs = new ByteBuffer[dataBlkNum];
|
||||
System.arraycopy(buffers, 0, inputs, 0, dataBlkNum);
|
||||
for (int i = 0; i < parityBlkNum; i++) {
|
||||
outputs[i].clear();
|
||||
outputs[i].limit(toVerifyLen);
|
||||
}
|
||||
this.encoder.encode(inputs, outputs);
|
||||
for (int i = 0; i < parityBlkNum; i++) {
|
||||
if (!buffers[dataBlkNum + i].equals(outputs[i])) {
|
||||
throw new Exception("EC compute result not match.");
|
||||
}
|
||||
}
|
||||
positionInBlock += toVerifyLen;
|
||||
}
|
||||
}
|
||||
|
||||
private BlockReader createBlockReader(ExtendedBlock block, DatanodeInfo dnInfo,
|
||||
Token<BlockTokenIdentifier> token) throws IOException {
|
||||
InetSocketAddress dnAddress = NetUtils.createSocketAddr(dnInfo.getXferAddr(useDNHostname));
|
||||
Peer peer = client.newConnectedPeer(dnAddress, token, dnInfo);
|
||||
return BlockReaderRemote.newBlockReader(
|
||||
"dummy", block, token, 0,
|
||||
block.getNumBytes(), true, "", peer, dnInfo,
|
||||
null, cachingStrategy, -1);
|
||||
}
|
||||
|
||||
private void closeBlockReaders() {
|
||||
for (int i = 0; i < blockReaders.length; i++) {
|
||||
if (blockReaders[i] != null) {
|
||||
IOUtils.closeStream(blockReaders[i]);
|
||||
blockReaders[i] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The command for getting help about other commands.
|
||||
*/
|
||||
|
|
|
@ -676,6 +676,16 @@ Usage: `hdfs debug recoverLease -path <path> [-retries <num-retries>]`
|
|||
|
||||
Recover the lease on the specified path. The path must reside on an HDFS file system. The default number of retries is 1.
|
||||
|
||||
### `verifyEC`
|
||||
|
||||
Usage: `hdfs debug verifyEC -file <file>`
|
||||
|
||||
| COMMAND\_OPTION | Description |
|
||||
|:---- |:---- |
|
||||
| [`-file` *EC-file*] | HDFS EC file to be verified. |
|
||||
|
||||
Verify the correctness of erasure coding on an erasure coded file.
|
||||
|
||||
dfsadmin with ViewFsOverloadScheme
|
||||
----------------------------------
|
||||
|
||||
|
|
|
@ -17,15 +17,22 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.tools;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -34,6 +41,8 @@ import org.junit.Test;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.PrintStream;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -44,23 +53,16 @@ public class TestDebugAdmin {
|
|||
static private final String TEST_ROOT_DIR =
|
||||
new File(System.getProperty("test.build.data", "/tmp"),
|
||||
TestDebugAdmin.class.getSimpleName()).getAbsolutePath();
|
||||
|
||||
private Configuration conf = new Configuration();
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
private DebugAdmin admin;
|
||||
private DataNode datanode;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
final File testRoot = new File(TEST_ROOT_DIR);
|
||||
testRoot.delete();
|
||||
testRoot.mkdirs();
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
admin = new DebugAdmin(conf);
|
||||
datanode = cluster.getDataNodes().get(0);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -92,8 +94,11 @@ public class TestDebugAdmin {
|
|||
|
||||
@Test(timeout = 60000)
|
||||
public void testRecoverLease() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
assertEquals("ret: 1, You must supply a -path argument to recoverLease.",
|
||||
runCmd(new String[]{"recoverLease", "-retries", "1"}));
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
FSDataOutputStream out = fs.create(new Path("/foo"));
|
||||
out.write(123);
|
||||
out.close();
|
||||
|
@ -103,6 +108,10 @@ public class TestDebugAdmin {
|
|||
|
||||
@Test(timeout = 60000)
|
||||
public void testVerifyMetaCommand() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
DataNode datanode = cluster.getDataNodes().get(0);
|
||||
DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef);
|
||||
FsDatasetSpi<?> fsd = datanode.getFSDataset();
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar"));
|
||||
|
@ -128,6 +137,10 @@ public class TestDebugAdmin {
|
|||
|
||||
@Test(timeout = 60000)
|
||||
public void testComputeMetaCommand() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
DataNode datanode = cluster.getDataNodes().get(0);
|
||||
DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef);
|
||||
FsDatasetSpi<?> fsd = datanode.getFSDataset();
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar"));
|
||||
|
@ -166,8 +179,97 @@ public class TestDebugAdmin {
|
|||
|
||||
@Test(timeout = 60000)
|
||||
public void testRecoverLeaseforFileNotFound() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
assertTrue(runCmd(new String[] {
|
||||
"recoverLease", "-path", "/foo", "-retries", "2" }).contains(
|
||||
"Giving up on recoverLease for /foo after 1 try"));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testVerifyECCommand() throws Exception {
|
||||
final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByID(
|
||||
SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
|
||||
cluster = DFSTestUtil.setupCluster(conf, 6, 5, 0);
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
assertEquals("ret: 1, verifyEC -file <file> Verify HDFS erasure coding on " +
|
||||
"all block groups of the file.", runCmd(new String[]{"verifyEC"}));
|
||||
|
||||
assertEquals("ret: 1, File /bar does not exist.",
|
||||
runCmd(new String[]{"verifyEC", "-file", "/bar"}));
|
||||
|
||||
fs.create(new Path("/bar")).close();
|
||||
assertEquals("ret: 1, File /bar is not erasure coded.",
|
||||
runCmd(new String[]{"verifyEC", "-file", "/bar"}));
|
||||
|
||||
|
||||
final Path ecDir = new Path("/ec");
|
||||
fs.mkdir(ecDir, FsPermission.getDirDefault());
|
||||
fs.enableErasureCodingPolicy(ecPolicy.getName());
|
||||
fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
|
||||
|
||||
assertEquals("ret: 1, File /ec is not a regular file.",
|
||||
runCmd(new String[]{"verifyEC", "-file", "/ec"}));
|
||||
|
||||
fs.create(new Path(ecDir, "foo"));
|
||||
assertEquals("ret: 1, File /ec/foo is not closed.",
|
||||
runCmd(new String[]{"verifyEC", "-file", "/ec/foo"}));
|
||||
|
||||
final short repl = 1;
|
||||
final long k = 1024;
|
||||
final long m = k * k;
|
||||
final long seed = 0x1234567L;
|
||||
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_65535"), 65535, repl, seed);
|
||||
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_65535"})
|
||||
.contains("All EC block group status: OK"));
|
||||
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_256k"), 256 * k, repl, seed);
|
||||
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_256k"})
|
||||
.contains("All EC block group status: OK"));
|
||||
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_1m"), m, repl, seed);
|
||||
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_1m"})
|
||||
.contains("All EC block group status: OK"));
|
||||
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_2m"), 2 * m, repl, seed);
|
||||
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_2m"})
|
||||
.contains("All EC block group status: OK"));
|
||||
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_3m"), 3 * m, repl, seed);
|
||||
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_3m"})
|
||||
.contains("All EC block group status: OK"));
|
||||
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_5m"), 5 * m, repl, seed);
|
||||
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_5m"})
|
||||
.contains("All EC block group status: OK"));
|
||||
DFSTestUtil.createFile(fs, new Path(ecDir, "foo_6m"), (int) k, 6 * m, m, repl, seed);
|
||||
assertEquals("ret: 0, Checking EC block group: blk_x;Status: OK" +
|
||||
"Checking EC block group: blk_x;Status: OK" +
|
||||
"All EC block group status: OK",
|
||||
runCmd(new String[]{"verifyEC", "-file", "/ec/foo_6m"})
|
||||
.replaceAll("blk_-[0-9]+", "blk_x;"));
|
||||
|
||||
Path corruptFile = new Path(ecDir, "foo_corrupt");
|
||||
DFSTestUtil.createFile(fs, corruptFile, 5841961, repl, seed);
|
||||
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs, corruptFile);
|
||||
assertEquals(1, blocks.size());
|
||||
LocatedStripedBlock blockGroup = (LocatedStripedBlock) blocks.get(0);
|
||||
LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup,
|
||||
ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
|
||||
// Try corrupt block 0 in block group.
|
||||
LocatedBlock toCorruptLocatedBlock = indexedBlocks[0];
|
||||
ExtendedBlock toCorruptBlock = toCorruptLocatedBlock.getBlock();
|
||||
DataNode datanode = cluster.getDataNode(toCorruptLocatedBlock.getLocations()[0].getIpcPort());
|
||||
File blockFile = getBlockFile(datanode.getFSDataset(),
|
||||
toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock());
|
||||
File metaFile = getMetaFile(datanode.getFSDataset(),
|
||||
toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock());
|
||||
// Write error bytes to block file and re-generate meta checksum.
|
||||
byte[] errorBytes = new byte[2097152];
|
||||
new Random(seed).nextBytes(errorBytes);
|
||||
FileUtils.writeByteArrayToFile(blockFile, errorBytes);
|
||||
metaFile.delete();
|
||||
runCmd(new String[]{"computeMeta", "-block", blockFile.getAbsolutePath(),
|
||||
"-out", metaFile.getAbsolutePath()});
|
||||
assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_corrupt"})
|
||||
.contains("Status: ERROR, message: EC compute result not match."));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue