HDFS-8033. Erasure coding: stateful (non-positional) read from files in striped layout. Contributed by Zhe Zhang.

This commit is contained in:
Zhe Zhang 2015-04-24 22:36:15 -07:00 committed by Zhe Zhang
parent f5d4a95ef5
commit 89d3378578
5 changed files with 465 additions and 57 deletions

View File

@ -131,3 +131,6 @@
HDFS-8228. Erasure Coding: SequentialBlockGroupIdGenerator#nextValue may cause
block id conflicts (Jing Zhao via Zhe Zhang)
HDFS-8033. Erasure coding: stateful (non-positional) read from files in
striped layout (Zhe Zhang)

View File

@ -96,34 +96,34 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
protected final DFSClient dfsClient;
private AtomicBoolean closed = new AtomicBoolean(false);
private final String src;
private final boolean verifyChecksum;
protected AtomicBoolean closed = new AtomicBoolean(false);
protected final String src;
protected final boolean verifyChecksum;
// state by stateful read only:
// (protected by lock on this)
/////
private DatanodeInfo currentNode = null;
private LocatedBlock currentLocatedBlock = null;
private long pos = 0;
private long blockEnd = -1;
protected LocatedBlock currentLocatedBlock = null;
protected long pos = 0;
protected long blockEnd = -1;
private BlockReader blockReader = null;
////
// state shared by stateful and positional read:
// (protected by lock on infoLock)
////
private LocatedBlocks locatedBlocks = null;
protected LocatedBlocks locatedBlocks = null;
private long lastBlockBeingWrittenLength = 0;
private FileEncryptionInfo fileEncryptionInfo = null;
private CachingStrategy cachingStrategy;
protected CachingStrategy cachingStrategy;
////
private final ReadStatistics readStatistics = new ReadStatistics();
protected final ReadStatistics readStatistics = new ReadStatistics();
// lock for state shared between read and pread
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
// (it's OK to acquire this lock when the lock on <this> is held)
private final Object infoLock = new Object();
protected final Object infoLock = new Object();
/**
* Track the ByteBuffers that we have handed out to readers.
@ -240,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* back to the namenode to get a new list of block locations, and is
* capped at maxBlockAcquireFailures
*/
private int failures = 0;
protected int failures = 0;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@ -477,7 +477,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Fetch a block from namenode and cache it */
private void fetchBlockAt(long offset) throws IOException {
protected void fetchBlockAt(long offset) throws IOException {
synchronized(infoLock) {
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
@ -580,7 +580,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
// Will be getting a new BlockReader.
closeCurrentBlockReader();
closeCurrentBlockReaders();
//
// Connect to best DataNode for desired Block, with potential offset
@ -621,7 +621,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
@ -697,7 +697,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
"unreleased ByteBuffers allocated by read(). " +
"Please release " + builder.toString() + ".");
}
closeCurrentBlockReader();
closeCurrentBlockReaders();
super.close();
}
@ -719,7 +719,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
throws ChecksumException, IOException;
}
private void updateReadStatistics(ReadStatistics readStatistics,
protected void updateReadStatistics(ReadStatistics readStatistics,
int nRead, BlockReader blockReader) {
if (nRead <= 0) return;
synchronized(infoLock) {
@ -755,7 +755,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
* Used to read bytes into a user-supplied ByteBuffer
*/
private class ByteBufferStrategy implements ReaderStrategy {
protected class ByteBufferStrategy implements ReaderStrategy {
final ByteBuffer buf;
ByteBufferStrategy(ByteBuffer buf) {
this.buf = buf;
@ -771,6 +771,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
if (ret == 0) {
DFSClient.LOG.warn("zero");
}
return ret;
} finally {
if (!success) {
@ -838,7 +841,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
@ -927,7 +930,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
/**
* Add corrupted block replica into map.
*/
private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
Set<DatanodeInfo> dnSet = null;
if((corruptedBlockMap.containsKey(blk))) {
@ -999,7 +1002,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node. Null if no node can be chosen.
*/
private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
@ -1368,7 +1371,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return true if block access token has expired or invalid and it should be
* refetched
*/
private static boolean tokenRefetchNeeded(IOException ex,
protected static boolean tokenRefetchNeeded(IOException ex,
InetSocketAddress targetAddr) {
/*
* Get a new access token and retry. Retry is needed in 2 cases. 1)
@ -1475,7 +1478,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
private void reportCheckSumFailure(
protected void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) {
if (corruptedBlockMap.isEmpty()) {
@ -1672,7 +1675,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
private void closeCurrentBlockReader() {
protected void closeCurrentBlockReaders() {
if (blockReader == null) return;
// Close the current block reader so that the new caching settings can
// take effect immediately.
@ -1692,7 +1695,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
}
closeCurrentBlockReader();
closeCurrentBlockReaders();
}
@Override
@ -1702,7 +1705,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
}
closeCurrentBlockReader();
closeCurrentBlockReaders();
}
/**
@ -1860,6 +1863,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override
public synchronized void unbuffer() {
closeCurrentBlockReader();
closeCurrentBlockReaders();
}
}

View File

@ -18,20 +18,21 @@
package org.apache.hadoop.hdfs;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -125,6 +126,9 @@ public class DFSStripedInputStream extends DFSInputStream {
return results;
}
private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
private BlockReader[] blockReaders = null;
private DatanodeInfo[] currentNodes = null;
private final int cellSize;
private final short dataBlkNum;
private final short parityBlkNum;
@ -143,13 +147,285 @@ public class DFSStripedInputStream extends DFSInputStream {
@Override
public synchronized int read(final ByteBuffer buf) throws IOException {
throw new UnsupportedActionException("Stateful read is not supported");
ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
TraceScope scope =
dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
try {
return readWithStrategy(byteBufferReader, 0, buf.remaining());
} finally {
scope.close();
}
}
/**
* When seeking into a new block group, create blockReader for each internal
* block in the group.
*/
@VisibleForTesting
private synchronized DatanodeInfo[] blockSeekTo(long target)
throws IOException {
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
// Will be getting a new BlockReader.
closeCurrentBlockReaders();
// Connect to best DataNode for desired Block, with potential offset
DatanodeInfo[] chosenNodes = new DatanodeInfo[groupSize];
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
// Compute desired striped block group
LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
// Update current position
this.pos = target;
this.blockEnd = targetBlockGroup.getStartOffset() +
targetBlockGroup.getBlockSize() - 1;
long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
// The purpose is to get start offset into each block
ReadPortion[] readPortions = planReadPortions(groupSize, cellSize,
offsetIntoBlockGroup, 0, 0);
while (true) {
int i = 0;
InetSocketAddress targetAddr = null;
try {
blockReaders = new BlockReader[groupSize];
for (i = 0; i < groupSize; i++) {
LocatedBlock targetBlock = targetBlocks[i];
if (targetBlock == null) {
continue;
}
long offsetIntoBlock = readPortions[i].startOffsetInBlock;
DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
chosenNodes[i] = retval.info;
targetAddr = retval.addr;
StorageType storageType = retval.storageType;
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
CachingStrategy curCachingStrategy;
boolean shortCircuitForbidden;
synchronized(infoLock) {
curCachingStrategy = cachingStrategy;
shortCircuitForbidden = shortCircuitForbidden();
}
blockReaders[i] = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNodes[i]).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
}
currentLocatedBlock = targetBlockGroup;
return chosenNodes;
} catch (IOException ex) {
// Retry in case of encryption key or token exceptions. Otherwise throw
// IOException: since each internal block is singly replicated, it's
// not meaningful trying to locate another replica.
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
} else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
refetchToken--;
fetchBlockAt(target);
} else {
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list and throw exception
addToDeadNodes(chosenNodes[i]);
throw ex;
}
}
}
}
/**
* Extend the super method with the logic of switching between cells.
* When reaching the end of a cell, proceed to the next cell and read it
* with the next blockReader.
*/
@Override
protected void closeCurrentBlockReaders() {
if (blockReaders == null || blockReaders.length == 0) {
return;
}
for (int i = 0; i < groupSize; i++) {
if (blockReaders[i] == null) {
continue;
}
try {
blockReaders[i].close();
} catch (IOException e) {
DFSClient.LOG.error("error closing blockReader", e);
}
blockReaders[i] = null;
}
blockEnd = -1;
}
@Override
public synchronized int read(final byte buf[], int off, int len)
protected synchronized int readWithStrategy(ReaderStrategy strategy,
int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
= new HashMap<>();
failures = 0;
if (pos < getFileLength()) {
int retries = 2;
/** Index of the target block in a stripe to read from */
int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
while (retries > 0) {
try {
// currentNode can be left as null if previous read had a checksum
// error on the same block. See HDFS-3067
if (pos > blockEnd || currentNodes == null) {
currentNodes = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
synchronized(infoLock) {
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen,
locatedBlocks.getFileLength() - pos);
}
}
/** Number of bytes already read into buffer */
int result = 0;
while (result < realLen) {
/**
* Temporary position into the file; {@link pos} might not proceed
* to this temporary position in case of exceptions.
*/
long tmpPos = pos + result;
/** Start and end offsets of a cell in the file */
long cellStart = (tmpPos / cellSize) * cellSize;
long cellEnd = cellStart + cellSize - 1;
/** Number of bytes to read from the current cell */
int realLenInCell = (int) Math.min(realLen - result,
cellEnd - tmpPos + 1L);
assert realLenInCell > 0 : "Temporary position shouldn't be " +
"after cellEnd";
// Read from one blockReader up to cell boundary
int cellRet = readBuffer(blockReaders[idxInGroup],
currentNodes[idxInGroup], strategy, off + result,
realLenInCell);
if (cellRet >= 0) {
result += cellRet;
if (cellRet < realLenInCell) {
// A short read indicates the current blockReader buffer is
// already drained. Should return the read call. Otherwise
// should proceed to the next cell.
break;
}
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
idxInGroup = (idxInGroup + 1) % dataBlkNum;
}
pos += result;
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
blockEnd = -1;
if (currentNodes[idxInGroup] != null) {
addToDeadNodes(currentNodes[idxInGroup]);
}
if (--retries == 0) {
throw e;
}
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length);
}
}
}
return -1;
}
private synchronized int readBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len)
throws IOException {
throw new UnsupportedActionException("Stateful read is not supported");
IOException ioe;
while (true) {
try {
return readerStrategy.doRead(blockReader, off, len);
} catch ( ChecksumException ce ) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
+ " at " + ce.getPos());
// If current block group is corrupt, it's meaningless to retry.
// TODO: this should trigger decoding logic (HDFS-7678)
throw ce;
} catch ( IOException e ) {
ioe = e;
}
boolean sourceFound = seekToBlockSource(pos);
if (!sourceFound) {
throw ioe;
}
}
}
private boolean seekToBlockSource(long targetPos)
throws IOException {
currentNodes = blockSeekTo(targetPos);
return true;
}
protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
ByteBufferStrategy(ByteBuffer buf) {
super(buf);
}
@Override
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException {
int oldlimit = buf.limit();
if (buf.remaining() > len) {
buf.limit(buf.position() + len);
}
int ret = super.doRead(blockReader, off, len);
buf.limit(oldlimit);
return ret;
}
}
/**
@ -188,8 +464,11 @@ public class DFSStripedInputStream extends DFSInputStream {
dataBlkNum, idx);
}
private LocatedBlock getBlockGroupAt(long offset) throws IOException {
return super.getBlockAt(offset);
private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
LocatedBlock lb = super.getBlockAt(offset);
assert lb instanceof LocatedStripedBlock : "NameNode" +
" should return a LocatedStripedBlock for a striped file";
return (LocatedStripedBlock)lb;
}
/**
@ -206,10 +485,8 @@ public class DFSStripedInputStream extends DFSInputStream {
int len = (int) (end - start + 1);
// Refresh the striped block group
LocatedBlock block = getBlockGroupAt(blockStartOffset);
assert block instanceof LocatedStripedBlock : "NameNode" +
" should return a LocatedStripedBlock for a striped file";
LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
// Planning the portion of I/O for each shard
ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
@ -308,7 +585,7 @@ public class DFSStripedInputStream extends DFSInputStream {
* +------------------------------------------------------+
*/
private long startOffsetInBlock = 0;
private long readLength = 0;
private int readLength = 0;
private final List<Integer> offsetsInBuf = new ArrayList<>();
private final List<Integer> lengths = new ArrayList<>();
@ -328,7 +605,7 @@ public class DFSStripedInputStream extends DFSInputStream {
return lens;
}
long getReadLength() {
int getReadLength() {
return readLength;
}

View File

@ -28,6 +28,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class TestDFSStripedInputStream {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
@ -165,6 +166,7 @@ public class TestDFSStripedInputStream {
Assert.assertEquals("File length should be the same",
writeBytes, fileLength);
// pread
try (DFSStripedInputStream dis =
new DFSStripedInputStream(fs.getClient(), src, true)) {
byte[] buf = new byte[writeBytes + 100];
@ -176,5 +178,46 @@ public class TestDFSStripedInputStream {
Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
}
}
// stateful read with byte array
try (DFSStripedInputStream dis =
new DFSStripedInputStream(fs.getClient(), src, true)) {
byte[] buf = new byte[writeBytes + 100];
int readLen = 0;
int ret;
do {
ret = dis.read(buf, readLen, buf.length - readLen);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
}
}
// stateful read with ByteBuffer
try (DFSStripedInputStream dis =
new DFSStripedInputStream(fs.getClient(), src, true)) {
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
int readLen = 0;
int ret;
do {
ret = dis.read(buf);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at i should be the same", getByte(i), buf.array()[i]);
}
}
}
}

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -28,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@ -38,6 +39,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@ -52,19 +54,21 @@ public class TestReadStripedFile {
private Path filePath = new Path(dirPath, "file");
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int NUM_STRIPE_PER_BLOCK = 2;
private final int BLOCKSIZE = NUM_STRIPE_PER_BLOCK * DATA_BLK_NUM * CELLSIZE;
private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE;
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(BLK_GROUP_SIZE)
.build();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
DATA_BLK_NUM + PARITY_BLK_NUM).build();
cluster.waitActive();
fs = cluster.getFileSystem();
fs.mkdirs(dirPath);
fs.getClient().createErasureCodingZone(dirPath.toString(), null);
}
@After
@ -80,10 +84,10 @@ public class TestReadStripedFile {
@Test
public void testGetBlock() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK, true);
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCKSIZE * numBlocks);
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
final DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
@ -103,11 +107,11 @@ public class TestReadStripedFile {
@Test
public void testPread() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK, true);
final int numBlocks = 2;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCKSIZE);
filePath.toString(), 0, BLOCK_GROUP_SIZE);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
@ -121,11 +125,89 @@ public class TestReadStripedFile {
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
int readSize = BLOCKSIZE;
int readSize = BLOCK_GROUP_SIZE;
byte[] readBuffer = new byte[readSize];
int ret = in.read(0, readBuffer, 0, readSize);
assertEquals(readSize, ret);
// TODO: verify read results with patterned data from HDFS-8117
}
@Test
public void testStatefulRead() throws Exception {
testStatefulRead(false, false);
testStatefulRead(true, false);
testStatefulRead(true, true);
}
private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
throws Exception {
final int numBlocks = 2;
final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
if (cellMisalignPacket) {
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1);
tearDown();
setup();
}
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, fileSize);
assert lbs.getLocatedBlocks().size() == numBlocks;
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
assert lb instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
for (int i = 0; i < DATA_BLK_NUM; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i,
NUM_STRIPE_PER_BLOCK * CELLSIZE,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
}
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
false);
byte[] expected = new byte[fileSize];
for (LocatedBlock bg : lbs.getLocatedBlocks()) {
/** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
for (int j = 0; j < DATA_BLK_NUM; j++) {
for (int k = 0; k < CELLSIZE; k++) {
int posInBlk = i * CELLSIZE + k;
int posInFile = (int) bg.getStartOffset() +
i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
expected[posInFile] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + j), posInBlk);
}
}
}
}
if (useByteBuffer) {
ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
int done = 0;
while (done < fileSize) {
int ret = in.read(readBuffer);
assertTrue(ret > 0);
done += ret;
}
assertArrayEquals(expected, readBuffer.array());
} else {
byte[] readBuffer = new byte[fileSize];
int done = 0;
while (done < fileSize) {
int ret = in.read(readBuffer, done, fileSize - done);
assertTrue(ret > 0);
done += ret;
}
assertArrayEquals(expected, readBuffer);
}
fs.delete(filePath, true);
}
}