HDFS-7782. Erasure coding: pread from files in striped layout. Contributed by Zhe Zhang and Jing Zhao

This commit is contained in:
Zhe Zhang 2015-04-07 11:20:13 -07:00 committed by Zhe Zhang
parent 99502cbbe2
commit 91c741a2a1
10 changed files with 896 additions and 112 deletions

View File

@ -203,4 +203,8 @@ public class LocatedBlock {
+ "; locs=" + Arrays.asList(locs)
+ "}";
}
public boolean isStriped() {
return false;
}
}

View File

@ -238,6 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final Sampler<?> traceSampler;
private final int smallBufferSize;
@ -380,6 +381,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
}
numThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE,
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
if (numThreads <= 0) {
LOG.warn("The value of "
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE
+ " must be greater than 0. The current setting is " + numThreads
+ ". Reset it to the default value "
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
numThreads =
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE;
}
this.initThreadsNumForStripedReads(numThreads);
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
@ -3193,11 +3207,52 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
LOG.debug("Using hedged reads; pool threads=" + num);
}
}
/**
* Create thread pool for parallel reading in striped layout,
* STRIPED_READ_THREAD_POOL, if it does not already exist.
* @param num Number of threads for striped reads thread pool.
*/
private void initThreadsNumForStripedReads(int num) {
assert num > 0;
if (STRIPED_READ_THREAD_POOL != null) {
return;
}
synchronized (DFSClient.class) {
if (STRIPED_READ_THREAD_POOL == null) {
STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedRead-" + threadIndex.getAndIncrement());
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
LOG.info("Execution for striped reading rejected, "
+ "Executing in current thread");
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
}
}
}
ThreadPoolExecutor getHedgedReadsThreadPool() {
return HEDGED_READ_THREAD_POOL;
}
ThreadPoolExecutor getStripedReadsThreadPool() {
return STRIPED_READ_THREAD_POOL;
}
boolean isHedgedReadsEnabled() {
return (HEDGED_READ_THREAD_POOL != null) &&
HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;

View File

@ -670,7 +670,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.reject-unresolved-dn-topology-mapping";
public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT =
false;
public static final String DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE =
"dfs.client.striped.read.threadpool.size";
// With default 3+2 schema, each normal read could span 3 DNs. So this
// default value accommodates 6 read streams
public static final int DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE = 18;
// Slow io warning log threshold settings for dfsclient and datanode.
public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.datanode.slow.io.warning.threshold.ms";

View File

@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
@ -94,7 +95,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
private final DFSClient dfsClient;
protected final DFSClient dfsClient;
private AtomicBoolean closed = new AtomicBoolean(false);
private final String src;
private final boolean verifyChecksum;
@ -441,7 +442,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return located block
* @throws IOException
*/
private LocatedBlock getBlockAt(long offset) throws IOException {
protected LocatedBlock getBlockAt(long offset) throws IOException {
synchronized(infoLock) {
assert (locatedBlocks != null) : "locatedBlocks is null";
@ -713,7 +714,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Wraps different possible read implementations so that readBuffer can be
* strategy-agnostic.
*/
private interface ReaderStrategy {
interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException;
}
@ -1058,7 +1059,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return errMsgr.toString();
}
private void fetchBlockByteRange(long blockStartOffset, long start, long end,
protected void fetchBlockByteRange(long blockStartOffset, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
@ -1100,13 +1101,42 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
};
}
/**
* Used when reading contiguous blocks
*/
private void actualGetFromOneDataNode(final DNAddrPair datanode,
long blockStartOffset, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int length = (int) (end - start + 1);
actualGetFromOneDataNode(datanode, block, start, end, buf,
new int[]{offset}, new int[]{length}, corruptedBlockMap);
}
/**
* Read data from one DataNode.
* @param datanode the datanode from which to read data
* @param block the block to read
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read
* @param offsets the data may be read into multiple segments of the buf
* (when reading a striped block). this array indicates the
* offset of each buf segment.
* @param lengths the length of each buf segment
* @param corruptedBlockMap map recording list of datanodes with corrupted
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long startInBlk, final long endInBlk,
byte[] buf, int[] offsets, int[] lengths,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
final int len = (int) (endInBlk - startInBlk + 1);
checkReadPortions(offsets, lengths, len);
while (true) {
// cached block locations may have been updated by chooseDataNode()
@ -1116,15 +1146,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
BlockReader reader = null;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
int len = (int) (end - start + 1);
reader = getBlockReader(block, start, len, datanode.addr,
datanode.storageType, datanode.info);
int nread = reader.readAll(buf, offset, len);
updateReadStatistics(readStatistics, nread, reader);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
for (int i = 0; i < offsets.length; i++) {
int nread = reader.readAll(buf, offsets[i], lengths[i]);
updateReadStatistics(readStatistics, nread, reader);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
}
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
@ -1169,7 +1199,26 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
* Like {@link #fetchBlockByteRange} except we start up a second, parallel,
* This method verifies that the read portions are valid and do not overlap
* with each other.
*/
private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
Preconditions.checkArgument(offsets.length == lengths.length &&
offsets.length > 0);
int sum = 0;
for (int i = 0; i < lengths.length; i++) {
if (i > 0) {
int gap = offsets[i] - offsets[i - 1];
// make sure read portions do not overlap with each other
Preconditions.checkArgument(gap >= lengths[i - 1]);
}
sum += lengths[i];
}
Preconditions.checkArgument(sum == totalLen);
}
/**
* Like {@link #fetchBlockByteRange}except we start up a second, parallel,
* 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
@ -1388,10 +1437,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
if (dfsClient.isHedgedReadsEnabled()) {
if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
targetStart + bytesToRead - 1, buffer, offset,
corruptedBlockMap);
targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
} else {
fetchBlockByteRange(blk.getStartOffset(), targetStart,
targetStart + bytesToRead - 1, buffer, offset,
@ -1587,7 +1635,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Utility class to encapsulate data node info and its address. */
private static final class DNAddrPair {
static final class DNAddrPair {
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;

View File

@ -0,0 +1,367 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/******************************************************************************
* DFSStripedInputStream reads from striped block groups, illustrated below:
*
* | <- Striped Block Group -> |
* blk_0 blk_1 blk_2 <- A striped block group has
* | | | {@link #groupSize} blocks
* v v v
* +------+ +------+ +------+
* |cell_0| |cell_1| |cell_2| <- The logical read order should be
* +------+ +------+ +------+ cell_0, cell_1, ...
* |cell_3| |cell_4| |cell_5|
* +------+ +------+ +------+
* |cell_6| |cell_7| |cell_8|
* +------+ +------+ +------+
* |cell_9|
* +------+ <- A cell contains {@link #cellSize} bytes of data
*
* Three styles of read will eventually be supported:
* 1. Stateful read: TODO: HDFS-8033
* 2. pread without decode support
* This is implemented by calculating the portion of read from each block and
* issuing requests to each DataNode in parallel.
* 3. pread with decode support: TODO: will be supported after HDFS-7678
*****************************************************************************/
public class DFSStripedInputStream extends DFSInputStream {
/**
* This method plans the read portion from each block in the stripe
* @param groupSize The size / width of the striping group
* @param cellSize The size of each striping cell
* @param startInBlk Starting offset in the striped block
* @param len Length of the read request
* @param bufOffset Initial offset in the result buffer
* @return array of {@link ReadPortion}, each representing the portion of I/O
* for an individual block in the group
*/
@VisibleForTesting
static ReadPortion[] planReadPortions(final int groupSize,
final int cellSize, final long startInBlk, final int len, int bufOffset) {
ReadPortion[] results = new ReadPortion[groupSize];
for (int i = 0; i < groupSize; i++) {
results[i] = new ReadPortion();
}
// cellIdxInBlk is the index of the cell in the block
// E.g., cell_3 is the 2nd cell in blk_0
int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize));
// blkIdxInGroup is the index of the block in the striped block group
// E.g., blk_2 is the 3rd block in the group
final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize);
results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
startInBlk % cellSize;
boolean crossStripe = false;
for (int i = 1; i < groupSize; i++) {
if (blkIdxInGroup + i >= groupSize && !crossStripe) {
cellIdxInBlk++;
crossStripe = true;
}
results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock =
cellSize * cellIdxInBlk;
}
int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
results[blkIdxInGroup].lengths.add(firstCellLen);
results[blkIdxInGroup].readLength += firstCellLen;
int i = (blkIdxInGroup + 1) % groupSize;
for (int done = firstCellLen; done < len; done += cellSize) {
ReadPortion rp = results[i];
rp.offsetsInBuf.add(done + bufOffset);
final int readLen = Math.min(len - done, cellSize);
rp.lengths.add(readLen);
rp.readLength += readLen;
i = (i + 1) % groupSize;
}
return results;
}
/**
* This method parses a striped block group into individual blocks.
*
* @param bg The striped block group
* @param dataBlkNum the number of data blocks
* @return An array containing the blocks in the group
*/
@VisibleForTesting
static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
int dataBlkNum, int cellSize) {
int locatedBGSize = bg.getBlockIndices().length;
// TODO not considering missing blocks for now, only identify data blocks
LocatedBlock[] lbs = new LocatedBlock[dataBlkNum];
for (short i = 0; i < locatedBGSize; i++) {
final int idx = bg.getBlockIndices()[i];
if (idx < dataBlkNum && lbs[idx] == null) {
lbs[idx] = constructInternalBlock(bg, i, cellSize, idx);
}
}
return lbs;
}
private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
int idxInReturnedLocs, int cellSize, int idxInBlockGroup) {
final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
// TODO: fix the numBytes computation
return new LocatedBlock(blk,
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
null);
}
private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
throws IOException {
super(dfsClient, src, verifyChecksum);
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}
@Override
public synchronized int read(final ByteBuffer buf) throws IOException {
throw new UnsupportedActionException("Stateful read is not supported");
}
@Override
public synchronized int read(final byte buf[], int off, int len)
throws IOException {
throw new UnsupportedActionException("Stateful read is not supported");
}
/**
* | <--------- LocatedStripedBlock (ID = 0) ---------> |
* LocatedBlock (0) | LocatedBlock (1) | LocatedBlock (2)
* ^
* offset
* On a striped file, the super method {@link DFSInputStream#getBlockAt}
* treats a striped block group as a single {@link LocatedBlock} object,
* which includes target in its range. This method adds the logic of:
* 1. Analyzing the index of required block based on offset
* 2. Parsing the block group to obtain the block location on that index
*/
@Override
protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
LocatedBlock lb = super.getBlockAt(blkStartOffset);
assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
"LocatedStripedBlock for a striped file";
int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
% groupSize);
// If indexing information is returned, iterate through the index array
// to find the entry for position idx in the group
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
int i = 0;
for (; i < lsb.getBlockIndices().length; i++) {
if (lsb.getBlockIndices()[i] == idx) {
break;
}
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+ blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
}
return constructInternalBlock(lsb, i, cellSize, idx);
}
private LocatedBlock getBlockGroupAt(long offset) throws IOException {
return super.getBlockAt(offset);
}
/**
* Real implementation of pread.
*/
@Override
protected void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
Map<Future<Void>, Integer> futures = new HashMap<>();
CompletionService<Void> stripedReadsService =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
int len = (int) (end - start + 1);
// Refresh the striped block group
block = getBlockGroupAt(block.getStartOffset());
assert block instanceof LocatedStripedBlock : "NameNode" +
" should return a LocatedStripedBlock for a striped file";
LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
// Planning the portion of I/O for each shard
ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start,
len, offset);
// Parse group to get chosen DN location
LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize);
for (short i = 0; i < groupSize; i++) {
ReadPortion rp = readPortions[i];
if (rp.readLength <= 0) {
continue;
}
DatanodeInfo loc = blks[i].getLocations()[0];
StorageType type = blks[i].getStorageTypes()[0];
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
futures.put(getFromDNRequest, (int) i);
}
while (!futures.isEmpty()) {
try {
waitNextCompletion(stripedReadsService, futures);
} catch (InterruptedException ie) {
// Ignore and retry
}
}
}
private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end,
final byte[] buf, final int[] offsets, final int[] lengths,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
final Span parentSpan = Trace.currentSpan();
return new Callable<Void>() {
@Override
public Void call() throws Exception {
TraceScope scope =
Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
try {
actualGetFromOneDataNode(datanode, block, start,
end, buf, offsets, lengths, corruptedBlockMap);
} finally {
scope.close();
}
return null;
}
};
}
private void waitNextCompletion(CompletionService<Void> stripedReadsService,
Map<Future<Void>, Integer> futures) throws InterruptedException {
if (futures.isEmpty()) {
throw new InterruptedException("Futures already empty");
}
Future<Void> future = null;
try {
future = stripedReadsService.take();
future.get();
futures.remove(future);
} catch (ExecutionException | CancellationException e) {
// already logged in the Callable
futures.remove(future);
}
throw new InterruptedException("let's retry");
}
public void setCellSize(int cellSize) {
this.cellSize = cellSize;
}
/**
* This class represents the portion of I/O associated with each block in the
* striped block group.
*/
static class ReadPortion {
/**
* startOffsetInBlock
* |
* v
* |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
* +------------------+------------------+----------------+
* | cell_0 | cell_3 | cell_6 | <- blk_0
* +------------------+------------------+----------------+
* _/ \_______________________
* | |
* v offsetsInBuf[0] v offsetsInBuf[1]
* +------------------------------------------------------+
* | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
* | (partial) | (from blk_1 and blk_2) | |
* +------------------------------------------------------+
*/
private long startOffsetInBlock = 0;
private long readLength = 0;
private final List<Integer> offsetsInBuf = new ArrayList<>();
private final List<Integer> lengths = new ArrayList<>();
int[] getOffsets() {
int[] offsets = new int[offsetsInBuf.size()];
for (int i = 0; i < offsets.length; i++) {
offsets[i] = offsetsInBuf.get(i);
}
return offsets;
}
int[] getLengths() {
int[] lens = new int[this.lengths.size()];
for (int i = 0; i < lens.length; i++) {
lens[i] = this.lengths.get(i);
}
return lens;
}
long getReadLength() {
return readLength;
}
long getStartOffsetInBlock() {
return startOffsetInBlock;
}
}
}

View File

@ -65,4 +65,9 @@ public class LocatedStripedBlock extends LocatedBlock {
public int[] getBlockIndices() {
return this.blockIndices;
}
@Override
public boolean isStriped() {
return true;
}
}

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import java.io.DataOutput;
import java.io.IOException;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
@ -203,8 +203,8 @@ public class BlockInfoStriped extends BlockInfo {
// In case striped blocks, total usage by this striped blocks should
// be the total of data blocks and parity blocks because
// `getNumBytes` is the total of actual data block size.
return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1)
* BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes();
return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CELL_SIZE) + 1)
* BLOCK_STRIPED_CELL_SIZE * parityBlockNum + getNumBytes();
}
@Override

View File

@ -66,6 +66,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@ -102,6 +108,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -124,8 +131,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@ -134,7 +143,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
@ -156,12 +164,8 @@ import org.junit.Assume;
import org.mockito.internal.util.reflection.Whitebox;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
/** Utilities for HDFS tests */
public class DFSTestUtil {
@ -1846,4 +1850,77 @@ public class DFSTestUtil {
reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
return reports;
}
public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
int numBlocks, int numStripesPerBlk) throws Exception {
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.mkdirs(dir);
dfs.getClient().createErasureCodingZone(dir.toString());
FSDataOutputStream out = null;
try {
out = dfs.create(file, (short) 1); // create an empty file
FSNamesystem ns = cluster.getNamesystem();
FSDirectory fsdir = ns.getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
ExtendedBlock previous = null;
for (int i = 0; i < numBlocks; i++) {
Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns,
file.toString(), fileNode, dfs.getClient().getClientName(),
previous, numStripesPerBlk);
previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
}
dfs.getClient().namenode.complete(file.toString(),
dfs.getClient().getClientName(), previous, fileNode.getId());
} finally {
IOUtils.cleanup(null, out);
}
}
static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
FSNamesystem ns, String file, INodeFile fileNode, String clientName,
ExtendedBlock previous, int numStripes) throws Exception {
fs.getClient().namenode.addBlock(file, clientName, previous, null,
fileNode.getId(), null);
final BlockInfo lastBlock = fileNode.getLastBlock();
final int groupSize = fileNode.getBlockReplication();
// 1. RECEIVING_BLOCK IBR
int i = 0;
for (DataNode dn : dataNodes) {
if (i < groupSize) {
final Block block = new Block(lastBlock.getBlockId() + i++, 0,
lastBlock.getGenerationStamp());
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
.makeReportForReceivedBlock(block,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
for (StorageReceivedDeletedBlocks report : reports) {
ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
}
}
}
// 2. RECEIVED_BLOCK IBR
i = 0;
for (DataNode dn : dataNodes) {
if (i < groupSize) {
final Block block = new Block(lastBlock.getBlockId() + i++,
numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
.makeReportForReceivedBlock(block,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
for (StorageReceivedDeletedBlocks report : reports) {
ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
}
}
}
lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
return lastBlock;
}
}

View File

@ -0,0 +1,304 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class TestReadStripedFile {
public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class);
private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
private DistributedFileSystem fs;
private final Path dirPath = new Path("/striped");
private Path filePath = new Path(dirPath, "file");
private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS;
private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int NUM_STRIPE_PER_BLOCK = 2;
private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE;
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE)
.build();
cluster.waitActive();
fs = cluster.getFileSystem();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
private void testPlanReadPortions(int startInBlk, int length,
int bufferOffset, int[] readLengths, int[] offsetsInBlock,
int[][] bufferOffsets, int[][] bufferLengths) {
ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
CELLSIZE, startInBlk, length, bufferOffset);
assertEquals(GROUP_SIZE, results.length);
for (int i = 0; i < GROUP_SIZE; i++) {
assertEquals(readLengths[i], results[i].getReadLength());
assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
final int[] bOffsets = results[i].getOffsets();
assertArrayEquals(bufferOffsets[i], bOffsets);
final int[] bLengths = results[i].getLengths();
assertArrayEquals(bufferLengths[i], bLengths);
}
}
/**
* Test {@link DFSStripedInputStream#planReadPortions}
*/
@Test
public void testPlanReadPortions() {
/**
* start block offset is 0, read cellSize - 10
*/
testPlanReadPortions(0, CELLSIZE - 10, 0,
new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
new int[][]{new int[]{0}, new int[]{}, new int[]{}},
new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
/**
* start block offset is 0, read 3 * cellSize
*/
testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
/**
* start block offset is 0, read cellSize + 10
*/
testPlanReadPortions(0, CELLSIZE + 10, 0,
new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
/**
* start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
*/
testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
new int[][]{new int[]{CELLSIZE, CELLSIZE},
new int[]{CELLSIZE, CELLSIZE},
new int[]{CELLSIZE, 10}});
/**
* start block offset is 2, read 3 * cellSize
*/
testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
new int[]{2, 0, 0},
new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
new int[]{100 + CELLSIZE - 2},
new int[]{100 + CELLSIZE * 2 - 2}},
new int[][]{new int[]{CELLSIZE - 2, 2},
new int[]{CELLSIZE},
new int[]{CELLSIZE}});
/**
* start block offset is 2, read 3 * cellSize + 10
*/
testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
new int[]{2, 0, 0},
new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
new int[]{CELLSIZE - 2},
new int[]{CELLSIZE * 2 - 2}},
new int[][]{new int[]{CELLSIZE - 2, 12},
new int[]{CELLSIZE},
new int[]{CELLSIZE}});
/**
* start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
*/
testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
new int[]{CELLSIZE, CELLSIZE - 1, 0},
new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
new int[]{1, 3 * CELLSIZE + 1}},
new int[][]{new int[]{CELLSIZE, CELLSIZE},
new int[]{1, CELLSIZE, 9},
new int[]{CELLSIZE, CELLSIZE}});
/**
* start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
*/
testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
new int[]{CELLSIZE, CELLSIZE, 9},
new int[]{1, CELLSIZE, CELLSIZE}});
}
private LocatedStripedBlock createDummyLocatedBlock() {
final long blockGroupID = -1048576;
DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];
String[] storageIDs = new String[TOTAL_SIZE];
StorageType[] storageTypes = new StorageType[TOTAL_SIZE];
int[] indices = new int[TOTAL_SIZE];
for (int i = 0; i < TOTAL_SIZE; i++) {
locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId());
storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid();
storageTypes[i] = StorageType.DISK;
indices[i] = (i + 2) % GROUP_SIZE;
}
return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
locs, storageIDs, storageTypes, indices, 0, false, null);
}
@Test
public void testParseDummyStripedBlock() {
LocatedStripedBlock lsb = createDummyLocatedBlock();
LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup(
lsb, GROUP_SIZE, CELLSIZE);
assertEquals(GROUP_SIZE, blocks.length);
for (int j = 0; j < GROUP_SIZE; j++) {
assertFalse(blocks[j].isStriped());
assertEquals(j,
BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock()));
assertEquals(j * CELLSIZE, blocks[j].getStartOffset());
}
}
@Test
public void testParseStripedBlock() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCKSIZE * numBlocks);
assertEquals(4, lbs.locatedBlockCount());
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
for (LocatedBlock lb : lbList) {
assertTrue(lb.isStriped());
}
for (int i = 0; i < numBlocks; i++) {
LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i));
LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
GROUP_SIZE, CELLSIZE);
assertEquals(GROUP_SIZE, blks.length);
for (int j = 0; j < GROUP_SIZE; j++) {
assertFalse(blks[j].isStriped());
assertEquals(j,
BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock()));
assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset());
}
}
}
/**
* Test {@link DFSStripedInputStream#getBlockAt(long)}
*/
@Test
public void testGetBlock() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCKSIZE * numBlocks);
final DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
for (LocatedBlock aLbList : lbList) {
LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
GROUP_SIZE, CELLSIZE);
for (int j = 0; j < GROUP_SIZE; j++) {
LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
assertEquals(blks[j].getBlock(), refreshed.getBlock());
assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
}
}
}
@Test
public void testPread() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
NUM_STRIPE_PER_BLOCK);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCKSIZE);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
for (int i = 0; i < GROUP_SIZE; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
in.setCellSize(CELLSIZE);
int readSize = BLOCKSIZE;
byte[] readBuffer = new byte[readSize];
int ret = in.read(0, readBuffer, 0, readSize);
assertEquals(readSize, ret);
// TODO: verify read results with patterned data from HDFS-8117
}
}

View File

@ -18,15 +18,11 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@ -36,19 +32,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -84,83 +75,10 @@ public class TestRecoverStripedBlocks {
}
}
public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
int numBlocks) throws Exception {
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.mkdirs(dir);
dfs.getClient().getNamenode().createErasureCodingZone(dir.toString());
FSDataOutputStream out = null;
try {
out = dfs.create(file, (short) 1); // create an empty file
FSNamesystem ns = cluster.getNamesystem();
FSDirectory fsdir = ns.getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
ExtendedBlock previous = null;
for (int i = 0; i < numBlocks; i++) {
Block newBlock = createBlock(cluster.getDataNodes(), ns,
file.toString(), fileNode, dfs.getClient().getClientName(),
previous);
previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
}
ns.completeFile(file.toString(), dfs.getClient().getClientName(),
previous, fileNode.getId());
} finally {
IOUtils.cleanup(null, out);
}
}
static Block createBlock(List<DataNode> dataNodes, FSNamesystem ns,
String file, INodeFile fileNode, String clientName,
ExtendedBlock previous) throws Exception {
ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null,
null);
final BlockInfo lastBlock = fileNode.getLastBlock();
final int groupSize = fileNode.getBlockReplication();
// 1. RECEIVING_BLOCK IBR
int i = 0;
for (DataNode dn : dataNodes) {
if (i < groupSize) {
final Block block = new Block(lastBlock.getBlockId() + i++, 0,
lastBlock.getGenerationStamp());
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
.makeReportForReceivedBlock(block,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
for (StorageReceivedDeletedBlocks report : reports) {
ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
}
}
}
// 2. RECEIVED_BLOCK IBR
i = 0;
for (DataNode dn : dataNodes) {
if (i < groupSize) {
final Block block = new Block(lastBlock.getBlockId() + i++,
BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp());
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
.makeReportForReceivedBlock(block,
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
for (StorageReceivedDeletedBlocks report : reports) {
ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
}
}
}
lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS);
return lastBlock;
}
@Test
public void testMissingStripedBlock() throws Exception {
final int numBlocks = 4;
createECFile(cluster, filePath, dirPath, numBlocks);
DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1);
// make sure the file is complete in NN
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
@ -172,7 +90,7 @@ public class TestRecoverStripedBlocks {
for (BlockInfo blk : blocks) {
assertTrue(blk.isStriped());
assertTrue(blk.isComplete());
assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
final BlockInfoStriped sb = (BlockInfoStriped) blk;
assertEquals(GROUP_SIZE, sb.numNodes());
}