HDFS-8602. Erasure Coding: Client can't read(decode) the EC files which have corrupt blocks. Contributed by Jing Zhao and Kai Sasaki.

This commit is contained in:
Jing Zhao 2015-06-19 14:07:38 -07:00
parent 448cb7df67
commit 8c423a8163
3 changed files with 129 additions and 17 deletions

View File

@ -314,3 +314,6 @@
HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block.
(Walter Su via jing9)
HDFS-8602. Erasure Coding: Client can't read(decode) the EC files which have
corrupt blocks. (jing9 and Kai Sasaki)

View File

@ -45,7 +45,6 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResu
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
@ -340,7 +339,7 @@ public class DFSStripedInputStream extends DFSInputStream {
private Callable<Void> readCell(final BlockReader reader,
final DatanodeInfo datanode, final long currentReaderOffset,
final long targetReaderOffset, final ByteBufferStrategy strategy,
final int targetLength,
final int targetLength, final ExtendedBlock currentBlock,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
return new Callable<Void>() {
@Override
@ -359,7 +358,8 @@ public class DFSStripedInputStream extends DFSInputStream {
}
int result = 0;
while (result < targetLength) {
int ret = readToBuffer(reader, datanode, strategy, corruptedBlockMap);
int ret = readToBuffer(reader, datanode, strategy, currentBlock,
corruptedBlockMap);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
}
@ -373,21 +373,22 @@ public class DFSStripedInputStream extends DFSInputStream {
private int readToBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ByteBufferStrategy readerStrategy,
ExtendedBlock currentBlock,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
try {
return readerStrategy.doRead(blockReader, 0, 0);
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
+ currentBlock + " from " + currentNode
+ " at " + ce.getPos());
// we want to remember which block replicas we have tried
addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
addIntoCorruptedBlockMap(currentBlock, currentNode,
corruptedBlockMap);
throw ce;
} catch (IOException e) {
DFSClient.LOG.warn("Exception while reading from "
+ getCurrentBlock() + " of " + src + " from "
+ currentBlock + " of " + src + " from "
+ currentNode, e);
throw e;
}
@ -768,7 +769,7 @@ public class DFSStripedInputStream extends DFSInputStream {
Callable<Void> readCallable = readCell(blockReaders[chunkIndex],
currentNodes[chunkIndex], blockReaderOffsets[chunkIndex],
alignedStripe.getOffsetInBlock(), strategy,
chunk.byteBuffer.remaining(), corruptedBlockMap);
chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap);
Future<Void> request = readingService.submit(readCallable);
futures.put(request, chunkIndex);
}

View File

@ -17,35 +17,37 @@
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.*;
public class TestReadStripedFileWithDecoding {
static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
private MiniDFSCluster cluster;
private FileSystem fs;
private DistributedFileSystem fs;
@Before
public void setup() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
.numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/",
null, cellSize);
fs = cluster.getFileSystem();
@ -73,6 +75,112 @@ public class TestReadStripedFileWithDecoding {
testReadWithDNFailure("/foo", cellSize * dataBlocks, 0);
}
/**
* Delete a data block before reading. Verify the decoding works correctly.
*/
@Test
public void testReadCorruptedData() throws IOException {
// create file
final Path file = new Path("/partially_deleted");
final int length = cellSize * dataBlocks * 2;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
// corrupt the first data block
// find the corresponding data node
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
Assert.assertNotEquals(-1, dnIndex);
// find the target block
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
cellSize, dataBlocks, parityBlocks);
// find the target block file
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
// delete the block file
LOG.info("Deliberately removing file " + blkFile.getName());
Assert.assertTrue("Cannot remove file", blkFile.delete());
verifyRead(file, length, bytes);
}
/**
* Corrupt the content of the data block before reading.
*/
@Test
public void testReadCorruptedData2() throws IOException {
// create file
final Path file = new Path("/partially_corrupted");
final int length = cellSize * dataBlocks * 2;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
// corrupt the first data block
// find the first data node
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
Assert.assertNotEquals(-1, dnIndex);
// find the first data block
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
cellSize, dataBlocks, parityBlocks);
// find the first block file
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
// corrupt the block file
LOG.info("Deliberately corrupting file " + blkFile.getName());
try (FileOutputStream out = new FileOutputStream(blkFile)) {
out.write("corruption".getBytes());
}
verifyRead(file, length, bytes);
}
private int findFirstDataNode(Path file, long length) throws IOException {
BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length);
String name = (locs[0].getNames())[0];
int dnIndex = 0;
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
return dnIndex;
}
dnIndex++;
}
return -1;
}
private void verifyRead(Path file, int length, byte[] expected)
throws IOException {
// pread
try (FSDataInputStream fsdis = fs.open(file)) {
byte[] buf = new byte[length];
int readLen = fsdis.read(0, buf, 0, buf.length);
Assert.assertEquals("The fileSize of file should be the same to write size",
length, readLen);
Assert.assertArrayEquals(expected, buf);
}
// stateful read
ByteBuffer result = ByteBuffer.allocate(length);
ByteBuffer buf = ByteBuffer.allocate(1024);
int readLen = 0;
int ret;
try (FSDataInputStream in = fs.open(file)) {
while ((ret = in.read(buf)) >= 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
}
}
Assert.assertEquals("The length of file should be the same to write size",
length, readLen);
Assert.assertArrayEquals(expected, result.array());
}
private void testReadWithDNFailure(String file, int fileSize,
int startOffsetInFile) throws IOException {
final int failedDNIdx = 2;