HDFS-8517. Fix a decoding issue in stripped block recovering in client side. Contributed by Kai Zheng.

This commit is contained in:
Jing Zhao 2015-06-02 15:35:49 -07:00
parent 2d847e7d62
commit 71329e817b
8 changed files with 304 additions and 187 deletions

View File

@ -271,3 +271,6 @@
HDFS-8444. Erasure Coding: fix cannot rename a zone dir HDFS-8444. Erasure Coding: fix cannot rename a zone dir
(Walter Su via vinayakumarb) (Walter Su via vinayakumarb)
HDFS-8517. Fix a decoding issue in stripped block recovering in client side.
(Kai Zheng via jing9)

View File

@ -597,9 +597,10 @@ public class DFSStripedInputStream extends DFSInputStream {
} }
if (alignedStripe.missingChunksNum > 0) { if (alignedStripe.missingChunksNum > 0) {
finalizeDecodeInputs(decodeInputs, alignedStripe); finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum,
decodeAndFillBuffer(decodeInputs, buf, alignedStripe, parityBlkNum, alignedStripe);
decoder); decodeAndFillBuffer(decodeInputs, buf, alignedStripe, dataBlkNum,
parityBlkNum, decoder);
} }
} }

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import java.util.*; import java.util.*;
@ -257,7 +256,8 @@ public class StripedBlockUtil {
new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()]; new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
for (int i = 0; i < alignedStripe.chunks.length; i++) { for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i] == null) { if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
alignedStripe.chunks[i].offsetsInBuf.add(0); alignedStripe.chunks[i].offsetsInBuf.add(0);
alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock()); alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock());
} }
@ -273,35 +273,57 @@ public class StripedBlockUtil {
* finalize decode input buffers. * finalize decode input buffers.
*/ */
public static void finalizeDecodeInputs(final byte[][] decodeInputs, public static void finalizeDecodeInputs(final byte[][] decodeInputs,
AlignedStripe alignedStripe) { int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) {
for (int i = 0; i < alignedStripe.chunks.length; i++) { for (int i = 0; i < alignedStripe.chunks.length; i++) {
StripingChunk chunk = alignedStripe.chunks[i]; final StripingChunk chunk = alignedStripe.chunks[i];
final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
if (chunk.state == StripingChunk.FETCHED) { if (chunk.state == StripingChunk.FETCHED) {
int posInBuf = 0; int posInBuf = 0;
for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j), System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j),
decodeInputs[i], posInBuf, chunk.lengthsInBuf.get(j)); decodeInputs[decodeIndex], posInBuf, chunk.lengthsInBuf.get(j));
posInBuf += chunk.lengthsInBuf.get(j); posInBuf += chunk.lengthsInBuf.get(j);
} }
} else if (chunk.state == StripingChunk.ALLZERO) { } else if (chunk.state == StripingChunk.ALLZERO) {
Arrays.fill(decodeInputs[i], (byte)0); Arrays.fill(decodeInputs[decodeIndex], (byte) 0);
} else { } else {
decodeInputs[i] = null; decodeInputs[decodeIndex] = null;
} }
} }
} }
/**
* Currently decoding requires parity chunks are before data chunks.
* The indices are opposite to what we store in NN. In future we may
* improve the decoding to make the indices order the same as in NN.
*
* @param index The index to convert
* @param dataBlkNum The number of data blocks
* @param parityBlkNum The number of parity blocks
* @return converted index
*/
public static int convertIndex4Decode(int index, int dataBlkNum,
int parityBlkNum) {
return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum;
}
public static int convertDecodeIndexBack(int index, int dataBlkNum,
int parityBlkNum) {
return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum;
}
/** /**
* Decode based on the given input buffers and schema. * Decode based on the given input buffers and schema.
*/ */
public static void decodeAndFillBuffer(final byte[][] decodeInputs, public static void decodeAndFillBuffer(final byte[][] decodeInputs,
byte[] buf, AlignedStripe alignedStripe, int parityBlkNum, byte[] buf, AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
RawErasureDecoder decoder) { RawErasureDecoder decoder) {
// Step 1: prepare indices and output buffers for missing data units // Step 1: prepare indices and output buffers for missing data units
int[] decodeIndices = new int[parityBlkNum]; int[] decodeIndices = new int[parityBlkNum];
int pos = 0; int pos = 0;
for (int i = 0; i < alignedStripe.chunks.length; i++) { for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i].state == StripingChunk.MISSING){ if (alignedStripe.chunks[i].state == StripingChunk.MISSING){
decodeIndices[pos++] = i; decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
} }
} }
decodeIndices = Arrays.copyOf(decodeIndices, pos); decodeIndices = Arrays.copyOf(decodeIndices, pos);
@ -313,13 +335,14 @@ public class StripedBlockUtil {
// Step 3: fill original application buffer with decoded data // Step 3: fill original application buffer with decoded data
for (int i = 0; i < decodeIndices.length; i++) { for (int i = 0; i < decodeIndices.length; i++) {
int missingBlkIdx = decodeIndices[i]; int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i],
dataBlkNum, parityBlkNum);
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
if (chunk.state == StripingChunk.MISSING) { if (chunk.state == StripingChunk.MISSING) {
int srcPos = 0; int srcPos = 0;
for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
System.arraycopy(decodeOutputs[i], srcPos, buf, chunk.offsetsInBuf.get(j), System.arraycopy(decodeOutputs[i], srcPos, buf,
chunk.lengthsInBuf.get(j)); chunk.offsetsInBuf.get(j), chunk.lengthsInBuf.get(j));
srcPos += chunk.lengthsInBuf.get(j); srcPos += chunk.lengthsInBuf.get(j);
} }
} }
@ -330,7 +353,7 @@ public class StripedBlockUtil {
* This method divides a requested byte range into an array of inclusive * This method divides a requested byte range into an array of inclusive
* {@link AlignedStripe}. * {@link AlignedStripe}.
* @param ecSchema The codec schema for the file, which carries the numbers * @param ecSchema The codec schema for the file, which carries the numbers
* of data / parity blocks, as well as cell size * of data / parity blocks
* @param cellSize Cell size of stripe * @param cellSize Cell size of stripe
* @param blockGroup The striped block group * @param blockGroup The striped block group
* @param rangeStartInBlockGroup The byte range's start offset in block group * @param rangeStartInBlockGroup The byte range's start offset in block group
@ -345,7 +368,6 @@ public class StripedBlockUtil {
int cellSize, LocatedStripedBlock blockGroup, int cellSize, LocatedStripedBlock blockGroup,
long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf, long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
int offsetInBuf) { int offsetInBuf) {
// TODO: change ECSchema naming to use cell size instead of chunk size
// Step 0: analyze range and calculate basic parameters // Step 0: analyze range and calculate basic parameters
int dataBlkNum = ecSchema.getNumDataUnits(); int dataBlkNum = ecSchema.getNumDataUnits();
@ -362,8 +384,7 @@ public class StripedBlockUtil {
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges); AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
// Step 4: calculate each chunk's position in destination buffer // Step 4: calculate each chunk's position in destination buffer
calcualteChunkPositionsInBuf(ecSchema, cellSize, stripes, cells, buf, calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf);
offsetInBuf);
// Step 5: prepare ALLZERO blocks // Step 5: prepare ALLZERO blocks
prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum); prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
@ -508,8 +529,8 @@ public class StripedBlockUtil {
return stripes.toArray(new AlignedStripe[stripes.size()]); return stripes.toArray(new AlignedStripe[stripes.size()]);
} }
private static void calcualteChunkPositionsInBuf(ECSchema ecSchema, private static void calcualteChunkPositionsInBuf(int cellSize,
int cellSize, AlignedStripe[] stripes, StripingCell[] cells, byte[] buf, AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
int offsetInBuf) { int offsetInBuf) {
/** /**
* | <--------------- AlignedStripe --------------->| * | <--------------- AlignedStripe --------------->|

View File

@ -790,15 +790,21 @@ public class DFSTestUtil {
return os.toByteArray(); return os.toByteArray();
} }
/* Write the given string to the given file */ /* Write the given bytes to the given file */
public static void writeFile(FileSystem fs, Path p, String s) public static void writeFile(FileSystem fs, Path p, byte[] bytes)
throws IOException { throws IOException {
if (fs.exists(p)) { if (fs.exists(p)) {
fs.delete(p, true); fs.delete(p, true);
} }
InputStream is = new ByteArrayInputStream(s.getBytes()); InputStream is = new ByteArrayInputStream(bytes);
FSDataOutputStream os = fs.create(p); FSDataOutputStream os = fs.create(p);
IOUtils.copyBytes(is, os, s.length(), true); IOUtils.copyBytes(is, os, bytes.length, true);
}
/* Write the given string to the given file */
public static void writeFile(FileSystem fs, Path p, String s)
throws IOException {
writeFile(fs, p, s.getBytes());
} }
/* Append the given string to the given file */ /* Append the given string to the given file */

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import java.io.IOException;
import java.util.Random;
public class StripedFileTestUtil {
static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
static final int stripesPerBlock = 4;
static final int blockSize = cellSize * stripesPerBlock;
static final int numDNs = dataBlocks + parityBlocks + 2;
static final Random r = new Random();
static byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
for (int i = 0; i < cnt; i++) {
bytes[i] = getByte(i);
}
return bytes;
}
static int readAll(FSDataInputStream in, byte[] buf) throws IOException {
int readLen = 0;
int ret;
while ((ret = in.read(buf, readLen, buf.length - readLen)) >= 0 &&
readLen <= buf.length) {
readLen += ret;
}
return readLen;
}
static byte getByte(long pos) {
final int mod = 29;
return (byte) (pos % mod + 1);
}
}

View File

@ -208,17 +208,18 @@ public class TestDFSStripedInputStream {
// Update the expected content for decoded data // Update the expected content for decoded data
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE]; byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
int[] missingBlkIdx = new int[]{failedDNIdx, DATA_BLK_NUM+1, DATA_BLK_NUM+2}; int[] missingBlkIdx = new int[]{failedDNIdx + PARITY_BLK_NUM, 1, 2};
byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE]; byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE];
for (int j = 0; j < DATA_BLK_NUM; j++) { for (int j = 0; j < DATA_BLK_NUM; j++) {
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE; int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
if (j != failedDNIdx) { if (j != failedDNIdx) {
System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE); System.arraycopy(expected, posInBuf, decodeInputs[j + PARITY_BLK_NUM],
0, CELLSIZE);
} }
} }
for (int k = 0; k < CELLSIZE; k++) { for (int k = 0; k < CELLSIZE; k++) {
int posInBlk = i * CELLSIZE + k; int posInBlk = i * CELLSIZE + k;
decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte( decodeInputs[0][k] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk); new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
} }
for (int m : missingBlkIdx) { for (int m : missingBlkIdx) {

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
public class TestReadStripedFileWithDecoding {
private MiniDFSCluster cluster;
private FileSystem fs;
@Before
public void setup() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/",
null, cellSize);
fs = cluster.getFileSystem();
}
@After
public void tearDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testWritePreadWithDNFailure1() throws IOException {
testWritePreadWithDNFailure("/foo", 0);
}
@Test
public void testWritePreadWithDNFailure2() throws IOException {
testWritePreadWithDNFailure("/foo", cellSize * 5);
}
private void testWritePreadWithDNFailure(String file, int startOffsetInFile)
throws IOException {
final int failedDNIdx = 2;
final int length = cellSize * (dataBlocks + 2);
Path testPath = new Path(file);
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, testPath, bytes);
// shut down the DN that holds the last internal data block
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
cellSize);
String name = (locs[0].getNames())[failedDNIdx];
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
dn.shutdown();
break;
}
}
// pread
try (FSDataInputStream fsdis = fs.open(testPath)) {
byte[] buf = new byte[length];
int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
Assert.assertEquals("The length of file should be the same to write size",
length - startOffsetInFile, readLen);
byte[] expected = new byte[readLen];
for (int i = startOffsetInFile; i < length; i++) {
expected[i - startOffsetInFile] = StripedFileTestUtil.getByte(i);
}
for (int i = startOffsetInFile; i < length; i++) {
Assert.assertEquals("Byte at " + i + " should be the same",
expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
}
}
}
}

View File

@ -18,17 +18,13 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream; import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -37,34 +33,30 @@ import org.junit.Test;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Random;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock;
public class TestWriteReadStripedFile { public class TestWriteReadStripedFile {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4;
static int blockSize = cellSize * stripesPerBlock;
static int numDNs = dataBlocks + parityBlocks + 2;
private static MiniDFSCluster cluster; private static MiniDFSCluster cluster;
private static Configuration conf;
private static FileSystem fs; private static FileSystem fs;
private static Configuration conf;
private static Random r= new Random();
@BeforeClass @BeforeClass
public static void setup() throws IOException { public static void setup() throws IOException {
conf = new Configuration(); conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null, cellSize); cluster.getFileSystem().getClient().createErasureCodingZone("/",
null, cellSize);
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
} }
@AfterClass @AfterClass
public static void tearDown() { public static void tearDown() throws IOException {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
@ -152,47 +144,21 @@ public class TestWriteReadStripedFile {
+ cellSize + 123); + cellSize + 123);
} }
private byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
for (int i = 0; i < cnt; i++) {
bytes[i] = getByte(i);
}
return bytes;
}
private int readAll(FSDataInputStream in, byte[] buf) throws IOException {
int readLen = 0;
int ret;
do {
ret = in.read(buf, readLen, buf.length - readLen);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0 && readLen < buf.length);
return readLen;
}
private byte getByte(long pos) {
final int mod = 29;
return (byte) (pos % mod + 1);
}
private void assertSeekAndRead(FSDataInputStream fsdis, int pos, private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
int writeBytes) throws IOException { int writeBytes) throws IOException {
fsdis.seek(pos); fsdis.seek(pos);
byte[] buf = new byte[writeBytes]; byte[] buf = new byte[writeBytes];
int readLen = readAll(fsdis, buf); int readLen = StripedFileTestUtil.readAll(fsdis, buf);
Assert.assertEquals(readLen, writeBytes - pos); Assert.assertEquals(readLen, writeBytes - pos);
for (int i = 0; i < readLen; i++) { for (int i = 0; i < readLen; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", Assert.assertEquals("Byte at " + i + " should be the same",
getByte(pos + i), buf[i]); StripedFileTestUtil.getByte(pos + i), buf[i]);
} }
} }
private void testOneFileUsingDFSStripedInputStream(String src, int fileLength) private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
throws IOException { throws IOException {
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
final byte[] expected = generateBytes(fileLength);
Path srcPath = new Path(src); Path srcPath = new Path(src);
DFSTestUtil.writeFile(fs, srcPath, new String(expected)); DFSTestUtil.writeFile(fs, srcPath, new String(expected));
@ -215,7 +181,7 @@ public class TestWriteReadStripedFile {
public void testWriteReadUsingWebHdfs() throws Exception { public void testWriteReadUsingWebHdfs() throws Exception {
int fileLength = blockSize * dataBlocks + cellSize + 123; int fileLength = blockSize * dataBlocks + cellSize + 123;
final byte[] expected = generateBytes(fileLength); final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME); WebHdfsConstants.WEBHDFS_SCHEME);
Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe"); Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe");
@ -231,7 +197,6 @@ public class TestWriteReadStripedFile {
verifySeek(fs, srcPath, fileLength); verifySeek(fs, srcPath, fileLength);
verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
//webhdfs doesn't support bytebuffer read //webhdfs doesn't support bytebuffer read
} }
void verifyLength(FileSystem fs, Path srcPath, int fileLength) void verifyLength(FileSystem fs, Path srcPath, int fileLength)
@ -243,152 +208,105 @@ public class TestWriteReadStripedFile {
void verifyPread(FileSystem fs, Path srcPath, int fileLength, void verifyPread(FileSystem fs, Path srcPath, int fileLength,
byte[] expected, byte[] buf) throws IOException { byte[] expected, byte[] buf) throws IOException {
FSDataInputStream in = fs.open(srcPath); try (FSDataInputStream in = fs.open(srcPath)) {
int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102, int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102, cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
cellSize * dataBlocks, fileLength - 102, fileLength - 1}; cellSize * dataBlocks, fileLength - 102, fileLength - 1};
for (int startOffset : startOffsets) { for (int startOffset : startOffsets) {
startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
int remaining = fileLength - startOffset; int remaining = fileLength - startOffset;
in.readFully(startOffset, buf, 0, remaining); in.readFully(startOffset, buf, 0, remaining);
for (int i = 0; i < remaining; i++) { for (int i = 0; i < remaining; i++) {
Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
"same", "same", expected[startOffset + i], buf[i]);
expected[startOffset + i], buf[i]); }
} }
} }
in.close();
} }
void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
byte[] expected, byte[] buf) throws IOException { byte[] expected, byte[] buf) throws IOException {
FSDataInputStream in = fs.open(srcPath); try (FSDataInputStream in = fs.open(srcPath)) {
final byte[] result = new byte[fileLength]; final byte[] result = new byte[fileLength];
int readLen = 0; int readLen = 0;
int ret; int ret;
do { while ((ret = in.read(buf, 0, buf.length)) >= 0) {
ret = in.read(buf, 0, buf.length);
if (ret > 0) {
System.arraycopy(buf, 0, result, readLen, ret); System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret; readLen += ret;
} }
} while (ret >= 0); Assert.assertEquals("The length of file should be the same to write size",
Assert.assertEquals("The length of file should be the same to write size", fileLength, readLen);
fileLength, readLen); Assert.assertArrayEquals(expected, result);
Assert.assertArrayEquals(expected, result); }
in.close();
} }
void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
byte[] expected, ByteBuffer buf) throws IOException { byte[] expected, ByteBuffer buf) throws IOException {
FSDataInputStream in = fs.open(srcPath); try (FSDataInputStream in = fs.open(srcPath)) {
ByteBuffer result = ByteBuffer.allocate(fileLength); ByteBuffer result = ByteBuffer.allocate(fileLength);
int readLen = 0; int readLen = 0;
int ret; int ret;
do { while ((ret = in.read(buf)) >= 0) {
ret = in.read(buf);
if (ret > 0) {
readLen += ret; readLen += ret;
buf.flip(); buf.flip();
result.put(buf); result.put(buf);
buf.clear(); buf.clear();
} }
} while (ret >= 0); Assert.assertEquals("The length of file should be the same to write size",
readLen = readLen >= 0 ? readLen : 0; fileLength, readLen);
Assert.assertEquals("The length of file should be the same to write size", Assert.assertArrayEquals(expected, result.array());
fileLength, readLen); }
Assert.assertArrayEquals(expected, result.array());
in.close();
} }
void verifySeek(FileSystem fs, Path srcPath, int fileLength) void verifySeek(FileSystem fs, Path srcPath, int fileLength)
throws IOException { throws IOException {
FSDataInputStream in = fs.open(srcPath); try (FSDataInputStream in = fs.open(srcPath)) {
// seek to 1/2 of content // seek to 1/2 of content
int pos = fileLength / 2; int pos = fileLength / 2;
assertSeekAndRead(in, pos, fileLength);
// seek to 1/3 of content
pos = fileLength / 3;
assertSeekAndRead(in, pos, fileLength);
// seek to 0 pos
pos = 0;
assertSeekAndRead(in, pos, fileLength);
if (fileLength > cellSize) {
// seek to cellSize boundary
pos = cellSize - 1;
assertSeekAndRead(in, pos, fileLength); assertSeekAndRead(in, pos, fileLength);
}
if (fileLength > cellSize * dataBlocks) { // seek to 1/3 of content
// seek to striped cell group boundary pos = fileLength / 3;
pos = cellSize * dataBlocks - 1;
assertSeekAndRead(in, pos, fileLength); assertSeekAndRead(in, pos, fileLength);
}
if (fileLength > blockSize * dataBlocks) { // seek to 0 pos
// seek to striped block group boundary pos = 0;
pos = blockSize * dataBlocks - 1;
assertSeekAndRead(in, pos, fileLength); assertSeekAndRead(in, pos, fileLength);
}
if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){ if (fileLength > cellSize) {
try { // seek to cellSize boundary
in.seek(-1); pos = cellSize - 1;
Assert.fail("Should be failed if seek to negative offset"); assertSeekAndRead(in, pos, fileLength);
} catch (EOFException e) {
// expected
} }
try { if (fileLength > cellSize * dataBlocks) {
in.seek(fileLength + 1); // seek to striped cell group boundary
Assert.fail("Should be failed if seek after EOF"); pos = cellSize * dataBlocks - 1;
} catch (EOFException e) { assertSeekAndRead(in, pos, fileLength);
// expected
} }
}
in.close();
}
@Test if (fileLength > blockSize * dataBlocks) {
public void testWritePreadWithDNFailure() throws IOException { // seek to striped block group boundary
final int failedDNIdx = 2; pos = blockSize * dataBlocks - 1;
final int length = cellSize * (dataBlocks + 2); assertSeekAndRead(in, pos, fileLength);
Path testPath = new Path("/foo");
final byte[] bytes = generateBytes(length);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
// shut down the DN that holds the last internal data block
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
cellSize);
String name = (locs[0].getNames())[failedDNIdx];
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
dn.shutdown();
break;
} }
}
// pread if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) {
int startOffsetInFile = cellSize * 5; try {
try (FSDataInputStream fsdis = fs.open(testPath)) { in.seek(-1);
byte[] buf = new byte[length]; Assert.fail("Should be failed if seek to negative offset");
int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length); } catch (EOFException e) {
Assert.assertEquals("The length of file should be the same to write size", // expected
length - startOffsetInFile, readLen); }
byte[] expected = new byte[readLen]; try {
for (int i = startOffsetInFile; i < length; i++) { in.seek(fileLength + 1);
expected[i - startOffsetInFile] = getByte(i); Assert.fail("Should be failed if seek after EOF");
} } catch (EOFException e) {
for (int i = startOffsetInFile; i < length; i++) { // expected
Assert.assertEquals("Byte at " + i + " should be the same", }
expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
} }
} }
} }