diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index a12f361bcee..2c91dad62dc 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -314,3 +314,6 @@ HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block. (Walter Su via jing9) + + HDFS-8602. Erasure Coding: Client can't read(decode) the EC files which have + corrupt blocks. (jing9 and Kai Sasaki) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index a7339b7d169..878e5e184dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -45,7 +45,6 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.net.NetUtils; import org.apache.htrace.Span; @@ -340,7 +339,7 @@ private void readOneStripe( private Callable readCell(final BlockReader reader, final DatanodeInfo datanode, final long currentReaderOffset, final long targetReaderOffset, final ByteBufferStrategy strategy, - final int targetLength, + final int targetLength, final ExtendedBlock currentBlock, final Map> corruptedBlockMap) { return new Callable() { @Override @@ -359,7 +358,8 @@ public Void call() throws Exception { } int result = 0; while (result < targetLength) { - int ret = readToBuffer(reader, datanode, strategy, corruptedBlockMap); + int ret = readToBuffer(reader, datanode, strategy, currentBlock, + corruptedBlockMap); if (ret < 0) { throw new IOException("Unexpected EOS from the reader"); } @@ -373,21 +373,22 @@ public Void call() throws Exception { private int readToBuffer(BlockReader blockReader, DatanodeInfo currentNode, ByteBufferStrategy readerStrategy, + ExtendedBlock currentBlock, Map> corruptedBlockMap) throws IOException { try { return readerStrategy.doRead(blockReader, 0, 0); } catch (ChecksumException ce) { DFSClient.LOG.warn("Found Checksum error for " - + getCurrentBlock() + " from " + currentNode + + currentBlock + " from " + currentNode + " at " + ce.getPos()); // we want to remember which block replicas we have tried - addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, + addIntoCorruptedBlockMap(currentBlock, currentNode, corruptedBlockMap); throw ce; } catch (IOException e) { DFSClient.LOG.warn("Exception while reading from " - + getCurrentBlock() + " of " + src + " from " + + currentBlock + " of " + src + " from " + currentNode, e); throw e; } @@ -768,7 +769,7 @@ void readChunk(final CompletionService service, Callable readCallable = readCell(blockReaders[chunkIndex], currentNodes[chunkIndex], blockReaderOffsets[chunkIndex], alignedStripe.getOffsetInBlock(), strategy, - chunk.byteBuffer.remaining(), corruptedBlockMap); + chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap); Future request = readingService.submit(readCallable); futures.put(request, chunkIndex); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index 0201d071ab2..3125e2e8aa8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -17,35 +17,37 @@ */ package org.apache.hadoop.hdfs; -import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; -import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; -import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; -import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.*; public class TestReadStripedFileWithDecoding { + static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class); private MiniDFSCluster cluster; - private FileSystem fs; + private DistributedFileSystem fs; @Before public void setup() throws IOException { - Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) + .numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().createErasureCodingZone("/", null, cellSize); fs = cluster.getFileSystem(); @@ -73,6 +75,112 @@ public void testReadWithDNFailure3() throws IOException { testReadWithDNFailure("/foo", cellSize * dataBlocks, 0); } + /** + * Delete a data block before reading. Verify the decoding works correctly. + */ + @Test + public void testReadCorruptedData() throws IOException { + // create file + final Path file = new Path("/partially_deleted"); + final int length = cellSize * dataBlocks * 2; + final byte[] bytes = StripedFileTestUtil.generateBytes(length); + DFSTestUtil.writeFile(fs, file, bytes); + + // corrupt the first data block + // find the corresponding data node + int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); + Assert.assertNotEquals(-1, dnIndex); + // find the target block + LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() + .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, + cellSize, dataBlocks, parityBlocks); + // find the target block file + File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); + File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); + Assert.assertTrue("Block file does not exist", blkFile.exists()); + // delete the block file + LOG.info("Deliberately removing file " + blkFile.getName()); + Assert.assertTrue("Cannot remove file", blkFile.delete()); + verifyRead(file, length, bytes); + } + + /** + * Corrupt the content of the data block before reading. + */ + @Test + public void testReadCorruptedData2() throws IOException { + // create file + final Path file = new Path("/partially_corrupted"); + final int length = cellSize * dataBlocks * 2; + final byte[] bytes = StripedFileTestUtil.generateBytes(length); + DFSTestUtil.writeFile(fs, file, bytes); + + // corrupt the first data block + // find the first data node + int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); + Assert.assertNotEquals(-1, dnIndex); + // find the first data block + LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() + .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, + cellSize, dataBlocks, parityBlocks); + // find the first block file + File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); + File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); + Assert.assertTrue("Block file does not exist", blkFile.exists()); + // corrupt the block file + LOG.info("Deliberately corrupting file " + blkFile.getName()); + try (FileOutputStream out = new FileOutputStream(blkFile)) { + out.write("corruption".getBytes()); + } + + verifyRead(file, length, bytes); + } + + private int findFirstDataNode(Path file, long length) throws IOException { + BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length); + String name = (locs[0].getNames())[0]; + int dnIndex = 0; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + return dnIndex; + } + dnIndex++; + } + return -1; + } + + private void verifyRead(Path file, int length, byte[] expected) + throws IOException { + // pread + try (FSDataInputStream fsdis = fs.open(file)) { + byte[] buf = new byte[length]; + int readLen = fsdis.read(0, buf, 0, buf.length); + Assert.assertEquals("The fileSize of file should be the same to write size", + length, readLen); + Assert.assertArrayEquals(expected, buf); + } + + // stateful read + ByteBuffer result = ByteBuffer.allocate(length); + ByteBuffer buf = ByteBuffer.allocate(1024); + int readLen = 0; + int ret; + try (FSDataInputStream in = fs.open(file)) { + while ((ret = in.read(buf)) >= 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + } + Assert.assertEquals("The length of file should be the same to write size", + length, readLen); + Assert.assertArrayEquals(expected, result.array()); + } + private void testReadWithDNFailure(String file, int fileSize, int startOffsetInFile) throws IOException { final int failedDNIdx = 2;