diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 6c5d7ce86c4..9b4bf24a725 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -139,4 +139,7 @@ commands from standbynode if any (vinayakumarb) HDFS-8189. ClientProtocol#createErasureCodingZone API was wrongly annotated - as Idempotent (vinayakumarb) \ No newline at end of file + as Idempotent (vinayakumarb) + + HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open. + (Kai Sasaki via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0a67cf7550e..722bb39695d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1193,7 +1193,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, // Get block info from namenode TraceScope scope = getPathTraceScope("newDFSInputStream", src); try { - return new DFSInputStream(this, src, verifyChecksum); + ECInfo info = getErasureCodingInfo(src); + if (info != null) { + return new DFSStripedInputStream(this, src, verifyChecksum, info); + } else { + return new DFSInputStream(this, src, verifyChecksum); + } } finally { scope.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index fe9e101c26f..f6f7ed21bdb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -134,11 +134,12 @@ public class DFSStripedInputStream extends DFSInputStream { private final short parityBlkNum; private final ECInfo ecInfo; - DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum) + DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECInfo info) throws IOException { super(dfsClient, src, verifyChecksum); // ECInfo is restored from NN just before reading striped file. - ecInfo = dfsClient.getErasureCodingInfo(src); + assert info != null; + ecInfo = info; cellSize = ecInfo.getSchema().getChunkSize(); dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits(); parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index cf109818c33..bcfc74b3d5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -167,10 +168,9 @@ public class TestDFSStripedInputStream { writeBytes, fileLength); // pread - try (DFSStripedInputStream dis = - new DFSStripedInputStream(fs.getClient(), src, true)) { + try (FSDataInputStream fsdis = fs.open(new Path(src))) { byte[] buf = new byte[writeBytes + 100]; - int readLen = dis.read(0, buf, 0, buf.length); + int readLen = fsdis.read(0, buf, 0, buf.length); readLen = readLen >= 0 ? readLen : 0; Assert.assertEquals("The length of file should be the same to write size", writeBytes, readLen); @@ -180,13 +180,12 @@ public class TestDFSStripedInputStream { } // stateful read with byte array - try (DFSStripedInputStream dis = - new DFSStripedInputStream(fs.getClient(), src, true)) { + try (FSDataInputStream fsdis = fs.open(new Path(src))) { byte[] buf = new byte[writeBytes + 100]; int readLen = 0; int ret; do { - ret = dis.read(buf, readLen, buf.length - readLen); + ret = fsdis.read(buf, readLen, buf.length - readLen); if (ret > 0) { readLen += ret; } @@ -201,13 +200,12 @@ public class TestDFSStripedInputStream { } // stateful read with ByteBuffer - try (DFSStripedInputStream dis = - new DFSStripedInputStream(fs.getClient(), src, true)) { + try (FSDataInputStream fsdis = fs.open(new Path(src))) { ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100); int readLen = 0; int ret; do { - ret = dis.read(buf); + ret = fsdis.read(buf); if (ret > 0) { readLen += ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java index d980bd6959a..1ad480e7f29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ECInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -33,6 +34,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.junit.After; import org.junit.Before; @@ -52,6 +54,8 @@ public class TestReadStripedFile { private DistributedFileSystem fs; private final Path dirPath = new Path("/striped"); private Path filePath = new Path(dirPath, "file"); + private ECInfo info = new ECInfo(filePath.toString(), + ECSchemaManager.getSystemDefaultSchema()); private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; @@ -89,7 +93,7 @@ public class TestReadStripedFile { LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks); final DFSStripedInputStream in = - new DFSStripedInputStream(fs.getClient(), filePath.toString(), false); + new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info); List lbList = lbs.getLocatedBlocks(); for (LocatedBlock aLbList : lbList) { @@ -124,7 +128,8 @@ public class TestReadStripedFile { bg.getBlock().getBlockPoolId()); } DFSStripedInputStream in = - new DFSStripedInputStream(fs.getClient(), filePath.toString(), false); + new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, info); int readSize = BLOCK_GROUP_SIZE; byte[] readBuffer = new byte[readSize]; int ret = in.read(0, readBuffer, 0, readSize); @@ -170,7 +175,7 @@ public class TestReadStripedFile { DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), filePath.toString(), - false); + false, info); byte[] expected = new byte[fileSize];