HDFS-8166. DFSStripedOutputStream should not create empty blocks. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-04-17 17:55:19 -07:00 committed by Zhe Zhang
parent 909632dd90
commit cd458c38a0
4 changed files with 237 additions and 179 deletions

View File

@ -22,10 +22,14 @@ import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -59,12 +63,12 @@ public class DFSStripedOutputStream extends DFSOutputStream {
*/ */
private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private ByteBuffer[] cellBuffers; private ByteBuffer[] cellBuffers;
private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS private final short numAllBlocks = HdfsConstants.NUM_DATA_BLOCKS
+ HdfsConstants.NUM_PARITY_BLOCKS; + HdfsConstants.NUM_PARITY_BLOCKS;
private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS; private final short numDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private int curIdx = 0; private int curIdx = 0;
/* bytes written in current block group */ /* bytes written in current block group */
private long currentBlockGroupBytes = 0; //private long currentBlockGroupBytes = 0;
//TODO: Use ErasureCoder interface (HDFS-7781) //TODO: Use ErasureCoder interface (HDFS-7781)
private RawErasureEncoder encoder; private RawErasureEncoder encoder;
@ -73,10 +77,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return streamers.get(0); return streamers.get(0);
} }
private long getBlockGroupSize() {
return blockSize * HdfsConstants.NUM_DATA_BLOCKS;
}
/** Construct a new output stream for creating a file. */ /** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress, EnumSet<CreateFlag> flag, Progressable progress,
@ -84,15 +84,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
throws IOException { throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
DFSClient.LOG.info("Creating striped output stream"); DFSClient.LOG.info("Creating striped output stream");
if (blockGroupBlocks <= 1) { checkConfiguration();
throw new IOException("The block group must contain more than one block.");
}
cellBuffers = new ByteBuffer[blockGroupBlocks]; cellBuffers = new ByteBuffer[numAllBlocks];
List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>(); List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
for (int i = 0; i < blockGroupBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(blockGroupBlocks)); stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(numAllBlocks));
try { try {
cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -103,29 +101,38 @@ public class DFSStripedOutputStream extends DFSOutputStream {
} }
} }
encoder = new RSRawEncoder(); encoder = new RSRawEncoder();
encoder.initialize(blockGroupDataBlocks, encoder.initialize(numDataBlocks,
blockGroupBlocks - blockGroupDataBlocks, cellSize); numAllBlocks - numDataBlocks, cellSize);
streamers = new ArrayList<>(blockGroupBlocks); List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
for (short i = 0; i < blockGroupBlocks; i++) { for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat, null, StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
i, stripeBlocks); i, stripeBlocks);
if (favoredNodes != null && favoredNodes.length != 0) { if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes); streamer.setFavoredNodes(favoredNodes);
} }
streamers.add(streamer); s.add(streamer);
} }
streamers = Collections.unmodifiableList(s);
refreshStreamer(); refreshStreamer();
} }
private void checkConfiguration() {
if (cellSize % bytesPerChecksum != 0) {
throw new HadoopIllegalArgumentException("Invalid values: "
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ ") must divide cell size (=" + cellSize + ").");
}
}
private void refreshStreamer() { private void refreshStreamer() {
streamer = streamers.get(curIdx); streamer = streamers.get(curIdx);
} }
private void moveToNextStreamer() { private void moveToNextStreamer() {
curIdx = (curIdx + 1) % blockGroupBlocks; curIdx = (curIdx + 1) % numAllBlocks;
refreshStreamer(); refreshStreamer();
} }
@ -136,20 +143,21 @@ public class DFSStripedOutputStream extends DFSOutputStream {
* @param buffers data buffers + parity buffers * @param buffers data buffers + parity buffers
*/ */
private void encode(ByteBuffer[] buffers) { private void encode(ByteBuffer[] buffers) {
ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks]; ByteBuffer[] dataBuffers = new ByteBuffer[numDataBlocks];
ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks]; ByteBuffer[] parityBuffers = new ByteBuffer[numAllBlocks - numDataBlocks];
for (int i = 0; i < blockGroupBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
if (i < blockGroupDataBlocks) { if (i < numDataBlocks) {
dataBuffers[i] = buffers[i]; dataBuffers[i] = buffers[i];
} else { } else {
parityBuffers[i - blockGroupDataBlocks] = buffers[i]; parityBuffers[i - numDataBlocks] = buffers[i];
} }
} }
encoder.encode(dataBuffers, parityBuffers); encoder.encode(dataBuffers, parityBuffers);
} }
/** /**
* Generate packets from a given buffer * Generate packets from a given buffer. This is only used for streamers
* writing parity blocks.
* *
* @param byteBuffer the given buffer to generate packets * @param byteBuffer the given buffer to generate packets
* @return packets generated * @return packets generated
@ -185,7 +193,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
throw new IOException(msg); throw new IOException(msg);
} }
// If current packet has not been enqueued for transmission, // If current packet has not been enqueued for transmission,
// but the cell buffer is full, we need to enqueue the packet // but the cell buffer is full, we need to enqueue the packet
if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) { if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
@ -213,13 +220,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
//When all data cells in a stripe are ready, we need to encode //When all data cells in a stripe are ready, we need to encode
//them and generate some parity cells. These cells will be //them and generate some parity cells. These cells will be
//converted to packets and put to their DataStreamer's queue. //converted to packets and put to their DataStreamer's queue.
if (curIdx == blockGroupDataBlocks) { if (curIdx == numDataBlocks) {
//encode the data cells //encode the data cells
for (int k = 0; k < blockGroupDataBlocks; k++) { for (int k = 0; k < numDataBlocks; k++) {
cellBuffers[k].flip(); cellBuffers[k].flip();
} }
encode(cellBuffers); encode(cellBuffers);
for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) { for (int i = numDataBlocks; i < numAllBlocks; i++) {
ByteBuffer parityBuffer = cellBuffers[i]; ByteBuffer parityBuffer = cellBuffers[i];
List<DFSPacket> packets = generatePackets(parityBuffer); List<DFSPacket> packets = generatePackets(parityBuffer);
for (DFSPacket p : packets) { for (DFSPacket p : packets) {
@ -245,13 +252,24 @@ public class DFSStripedOutputStream extends DFSOutputStream {
} }
private void clearCellBuffers() { private void clearCellBuffers() {
for (int i = 0; i< blockGroupBlocks; i++) { for (int i = 0; i< numAllBlocks; i++) {
cellBuffers[i].clear(); cellBuffers[i].clear();
if (i >= numDataBlocks) {
Arrays.fill(cellBuffers[i].array(), (byte) 0);
}
} }
} }
private int stripeDataSize() { private int stripeDataSize() {
return blockGroupDataBlocks * cellSize; return numDataBlocks * cellSize;
}
private long getCurrentBlockGroupBytes() {
long sum = 0;
for (int i = 0; i < numDataBlocks; i++) {
sum += streamers.get(i).getBytesCurBlock();
}
return sum;
} }
private void notSupported(String headMsg) private void notSupported(String headMsg)
@ -270,7 +288,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
notSupported("hsync"); notSupported("hsync");
} }
@Override @Override
protected synchronized void start() { protected synchronized void start() {
for (StripedDataStreamer streamer : streamers) { for (StripedDataStreamer streamer : streamers) {
@ -302,15 +319,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
// interrupt datastreamer if force is true // interrupt datastreamer if force is true
@Override @Override
protected void closeThreads(boolean force) throws IOException { protected void closeThreads(boolean force) throws IOException {
StripedDataStreamer leadingStreamer = null;
for (StripedDataStreamer streamer : streamers) { for (StripedDataStreamer streamer : streamers) {
try { try {
streamer.close(force); streamer.close(force);
streamer.join(); streamer.join();
streamer.closeSocket(); streamer.closeSocket();
if (streamer.isLeadingStreamer()) {
leadingStreamer = streamer;
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer"); throw new IOException("Failed to shutdown streamer");
} finally { } finally {
@ -318,40 +331,26 @@ public class DFSStripedOutputStream extends DFSOutputStream {
setClosed(); setClosed();
} }
} }
assert leadingStreamer != null : "One streamer should be leader";
leadingStreamer.countTailingBlockGroupBytes();
} }
@Override private void writeParityCellsForLastStripe() throws IOException {
public synchronized void write(int b) throws IOException { final long currentBlockGroupBytes = getCurrentBlockGroupBytes();
super.write(b);
currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize();
}
@Override
public synchronized void write(byte b[], int off, int len)
throws IOException {
super.write(b, off, len);
currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize();
}
private void writeParityCellsForLastStripe() throws IOException{
long parityBlkSize = StripedBlockUtil.getInternalBlockLength( long parityBlkSize = StripedBlockUtil.getInternalBlockLength(
currentBlockGroupBytes, cellSize, blockGroupDataBlocks, currentBlockGroupBytes, cellSize, numDataBlocks,
blockGroupDataBlocks + 1); numDataBlocks + 1);
if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) { if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) {
return; return;
} }
int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize : int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize :
(int) (parityBlkSize % cellSize); (int) (parityBlkSize % cellSize);
for (int i = 0; i < blockGroupBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
long internalBlkLen = StripedBlockUtil.getInternalBlockLength( long internalBlkLen = StripedBlockUtil.getInternalBlockLength(
currentBlockGroupBytes, cellSize, blockGroupDataBlocks, i); currentBlockGroupBytes, cellSize, numDataBlocks, i);
// Pad zero bytes to make all cells exactly the size of parityCellSize // Pad zero bytes to make all cells exactly the size of parityCellSize
// If internal block is smaller than parity block, pad zero bytes. // If internal block is smaller than parity block, pad zero bytes.
// Also pad zero bytes to all parity cells // Also pad zero bytes to all parity cells
if (internalBlkLen < parityBlkSize || i >= blockGroupDataBlocks) { if (internalBlkLen < parityBlkSize || i >= numDataBlocks) {
int position = cellBuffers[i].position(); int position = cellBuffers[i].position();
assert position <= parityCellSize : "If an internal block is smaller" + assert position <= parityCellSize : "If an internal block is smaller" +
" than parity block, then its last cell should be small than last" + " than parity block, then its last cell should be small than last" +
@ -365,9 +364,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
encode(cellBuffers); encode(cellBuffers);
//write parity cells //write parity cells
curIdx = blockGroupDataBlocks; curIdx = numDataBlocks;
refreshStreamer(); refreshStreamer();
for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) { for (int i = numDataBlocks; i < numAllBlocks; i++) {
ByteBuffer parityBuffer = cellBuffers[i]; ByteBuffer parityBuffer = cellBuffers[i];
List<DFSPacket> packets = generatePackets(parityBuffer); List<DFSPacket> packets = generatePackets(parityBuffer);
for (DFSPacket p : packets) { for (DFSPacket p : packets) {
@ -385,7 +384,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
@Override @Override
void setClosed() { void setClosed() {
super.setClosed(); super.setClosed();
for (int i = 0; i < blockGroupBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
byteArrayManager.release(cellBuffers[i].array()); byteArrayManager.release(cellBuffers[i].array());
streamers.get(i).release(); streamers.get(i).release();
} }
@ -395,10 +394,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
protected synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {
if (isClosed()) { if (isClosed()) {
IOException e = getLeadingStreamer().getLastException().getAndSet(null); IOException e = getLeadingStreamer().getLastException().getAndSet(null);
if (e == null) if (e != null) {
return;
else
throw e; throw e;
} else {
return;
}
} }
try { try {
@ -408,14 +408,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
streamer.waitAndQueuePacket(currentPacket); streamer.waitAndQueuePacket(currentPacket);
currentPacket = null; currentPacket = null;
} }
//if the last stripe is incomplete, generate and write parity cells // if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe(); writeParityCellsForLastStripe();
for (int i = 0; i < blockGroupBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
curIdx = i; curIdx = i;
refreshStreamer(); refreshStreamer();
if (streamer.getBytesCurBlock()!= 0 || if (streamer.getBytesCurBlock() > 0) {
currentBlockGroupBytes < getBlockGroupSize()) {
// send an empty packet to mark the end of the block // send an empty packet to mark the end of the block
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true); streamer.getAndIncCurrentSeqno(), true);
@ -425,9 +424,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
flushInternal(); flushInternal();
} }
// get last block before destroying the streamer
ExtendedBlock lastBlock = streamers.get(0).getBlock();
closeThreads(false); closeThreads(false);
final ExtendedBlock lastBlock = getCommittedBlock();
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try { try {
completeFile(lastBlock); completeFile(lastBlock);
@ -435,10 +433,35 @@ public class DFSStripedOutputStream extends DFSOutputStream {
scope.close(); scope.close();
} }
dfsClient.endFileLease(fileId); dfsClient.endFileLease(fileId);
} catch (ClosedChannelException e) { } catch (ClosedChannelException ignored) {
} finally { } finally {
setClosed(); setClosed();
} }
} }
/**
* Generate the block which is reported and will be committed in NameNode.
* Need to go through all the streamers writing data blocks and add their
* bytesCurBlock together. Note that at this time all streamers have been
* closed. Also this calculation can cover streamers with writing failures.
*
* @return An ExtendedBlock with size of the whole block group.
*/
ExtendedBlock getCommittedBlock() throws IOException {
ExtendedBlock b = getLeadingStreamer().getBlock();
if (b == null) {
return null;
}
final ExtendedBlock block = new ExtendedBlock(b);
final boolean atBlockGroupBoundary =
getLeadingStreamer().getBytesCurBlock() == 0 &&
getLeadingStreamer().getBlock() != null &&
getLeadingStreamer().getBlock().getNumBytes() > 0;
for (int i = 1; i < numDataBlocks; i++) {
block.setNumBytes(block.getNumBytes() +
(atBlockGroupBoundary ? streamers.get(i).getBlock().getNumBytes() :
streamers.get(i).getBytesCurBlock()));
}
return block;
}
} }

View File

@ -22,7 +22,6 @@ import java.util.List;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@ -37,6 +36,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
/**************************************************************************** /****************************************************************************
* The StripedDataStreamer class is used by {@link DFSStripedOutputStream}. * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
* There are two kinds of StripedDataStreamer, leading streamer and ordinary * There are two kinds of StripedDataStreamer, leading streamer and ordinary
@ -48,8 +51,6 @@ import java.util.concurrent.atomic.AtomicReference;
public class StripedDataStreamer extends DataStreamer { public class StripedDataStreamer extends DataStreamer {
private final short index; private final short index;
private final List<BlockingQueue<LocatedBlock>> stripedBlocks; private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS
+ HdfsConstants.NUM_PARITY_BLOCKS;
private boolean hasCommittedBlock = false; private boolean hasCommittedBlock = false;
StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block, StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
@ -88,35 +89,38 @@ public class StripedDataStreamer extends DataStreamer {
} }
private boolean isParityStreamer() { private boolean isParityStreamer() {
return index >= HdfsConstants.NUM_DATA_BLOCKS; return index >= NUM_DATA_BLOCKS;
} }
@Override @Override
protected void endBlock() { protected void endBlock() {
if (!isLeadingStreamer() && !isParityStreamer()) { if (!isLeadingStreamer() && !isParityStreamer()) {
//before retrieving a new block, transfer the finished block to // before retrieving a new block, transfer the finished block to
//leading streamer // leading streamer
LocatedBlock finishedBlock = new LocatedBlock( LocatedBlock finishedBlock = new LocatedBlock(
new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(), new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
block.getNumBytes(),block.getGenerationStamp()), null); block.getNumBytes(), block.getGenerationStamp()), null);
try{ try {
boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30, boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
TimeUnit.SECONDS); TimeUnit.SECONDS);
}catch (InterruptedException ie) { } catch (InterruptedException ie) {
//TODO: Handle InterruptedException (HDFS-7786) // TODO: Handle InterruptedException (HDFS-7786)
} }
} }
super.endBlock(); super.endBlock();
} }
/** @Override
* This function is called after the streamer is closed. protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
*/ throws IOException {
void countTailingBlockGroupBytes () throws IOException { LocatedBlock lb = null;
if (isLeadingStreamer()) { if (isLeadingStreamer()) {
//when committing a block group, leading streamer has to adjust if (hasCommittedBlock) {
// {@link block} including the size of block group /**
for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) { * when committing a block group, leading streamer has to adjust
* {@link block} to include the size of block group
*/
for (int i = 1; i < NUM_DATA_BLOCKS; i++) {
try { try {
LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30, LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
TimeUnit.SECONDS); TimeUnit.SECONDS);
@ -129,37 +133,6 @@ public class StripedDataStreamer extends DataStreamer {
if (block != null) { if (block != null) {
block.setNumBytes(block.getNumBytes() + bytes); block.setNumBytes(block.getNumBytes() + bytes);
} }
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when " +
"putting a block to stripeBlocks, ie = " + ie);
}
}
}
}
@Override
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
LocatedBlock lb = null;
if (isLeadingStreamer()) {
if(hasCommittedBlock) {
/**
* when committing a block group, leading streamer has to adjust
* {@link block} to include the size of block group
*/
for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
try {
LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
TimeUnit.SECONDS);
if (finishedLocatedBlock == null) {
throw new IOException("Fail to get finished LocatedBlock " +
"from streamer, i=" + i);
}
ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
if(block != null) {
block.setNumBytes(block.getNumBytes() + bytes);
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when putting" + DFSClient.LOG.info("InterruptedException received when putting" +
" a block to stripeBlocks, ie = " + ie); " a block to stripeBlocks, ie = " + ie);
@ -171,14 +144,13 @@ public class StripedDataStreamer extends DataStreamer {
hasCommittedBlock = true; hasCommittedBlock = true;
assert lb instanceof LocatedStripedBlock; assert lb instanceof LocatedStripedBlock;
DFSClient.LOG.debug("Leading streamer obtained bg " + lb); DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
LocatedBlock[] blocks = StripedBlockUtil. LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
parseStripedBlockGroup((LocatedStripedBlock) lb, (LocatedStripedBlock) lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS,
HdfsConstants.BLOCK_STRIPED_CELL_SIZE, HdfsConstants.NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
HdfsConstants.NUM_PARITY_BLOCKS assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
);
assert blocks.length == blockGroupSize :
"Fail to get block group from namenode: blockGroupSize: " + "Fail to get block group from namenode: blockGroupSize: " +
blockGroupSize + ", blocks.length: " + blocks.length; (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
blocks.length;
lb = blocks[0]; lb = blocks[0];
for (int i = 1; i < blocks.length; i++) { for (int i = 1; i < blocks.length; i++) {
try { try {
@ -199,7 +171,7 @@ public class StripedDataStreamer extends DataStreamer {
} }
} else { } else {
try { try {
//wait 90 seconds to get a block from the queue // wait 90 seconds to get a block from the queue
lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS); lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when retrieving " + DFSClient.LOG.info("InterruptedException received when retrieving " +

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -598,8 +599,20 @@ public class BlockManager {
} }
public short getMinStorageNum(BlockInfo block) { public short getMinStorageNum(BlockInfo block) {
return block.isStriped() ? if (block.isStriped()) {
((BlockInfoStriped) block).getDataBlockNum() : minReplication; final BlockInfoStriped sblock = (BlockInfoStriped) block;
short dataBlockNum = sblock.getDataBlockNum();
if (sblock.isComplete() ||
sblock.getBlockUCState() == BlockUCState.COMMITTED) {
// if the sblock is committed/completed and its length is less than a
// full stripe, the minimum storage number needs to be adjusted
dataBlockNum = (short) Math.min(dataBlockNum,
(sblock.getNumBytes() - 1) / HdfsConstants.BLOCK_STRIPED_CELL_SIZE + 1);
}
return dataBlockNum;
} else {
return minReplication;
}
} }
public boolean hasMinStorage(BlockInfo block) { public boolean hasMinStorage(BlockInfo block) {

View File

@ -1,5 +1,6 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -20,6 +21,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.junit.After; import org.junit.After;
@ -42,8 +45,8 @@ public class TestDFSStripedOutputStream {
private DistributedFileSystem fs; private DistributedFileSystem fs;
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int stripesPerBlock = 4; private final int stripesPerBlock = 4;
int blockSize = cellSize * stripesPerBlock; private final int blockSize = cellSize * stripesPerBlock;
private int mod = 29; private final RawErasureEncoder encoder = new RSRawEncoder();
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
@ -53,6 +56,7 @@ public class TestDFSStripedOutputStream {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null); cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
encoder.initialize(dataBlocks, parityBlocks, cellSize);
} }
@After @After
@ -144,60 +148,27 @@ public class TestDFSStripedOutputStream {
} }
private byte getByte(long pos) { private byte getByte(long pos) {
int mod = 29;
return (byte) (pos % mod + 1); return (byte) (pos % mod + 1);
} }
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
throws IOException {
Path TestPath = new Path(src);
byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
//check file length
FileStatus status = fs.getFileStatus(TestPath);
long fileLength = status.getLen();
if (fileLength != writeBytes) {
Assert.fail("File Length error: expect=" + writeBytes
+ ", actual=" + fileLength);
}
DFSStripedInputStream dis = new DFSStripedInputStream(
fs.getClient(), src, true);
byte[] buf = new byte[writeBytes + 100];
int readLen = dis.read(0, buf, 0, buf.length);
readLen = readLen >= 0 ? readLen : 0;
if (readLen != writeBytes) {
Assert.fail("The length of file is not correct.");
}
for (int i = 0; i < writeBytes; i++) {
if (getByte(i) != buf[i]) {
Assert.fail("Byte at i = " + i + " is wrongly written.");
}
}
}
private void testOneFile(String src, int writeBytes) private void testOneFile(String src, int writeBytes)
throws IOException { throws IOException {
Path TestPath = new Path(src); Path testPath = new Path(src);
int allBlocks = dataBlocks + parityBlocks;
byte[] bytes = generateBytes(writeBytes); byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, TestPath, new String(bytes)); DFSTestUtil.writeFile(fs, testPath, new String(bytes));
//check file length // check file length
FileStatus status = fs.getFileStatus(TestPath); FileStatus status = fs.getFileStatus(testPath);
long fileLength = status.getLen(); long fileLength = status.getLen();
if (fileLength != writeBytes) { Assert.assertEquals(writeBytes, fileLength);
Assert.fail("File Length error: expect=" + writeBytes
+ ", actual=" + fileLength);
}
List<List<LocatedBlock>> blockGroupList = new ArrayList<>(); List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L); LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
assert firstBlock instanceof LocatedStripedBlock; Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
LocatedBlock[] blocks = StripedBlockUtil. LocatedBlock[] blocks = StripedBlockUtil.
parseStripedBlockGroup((LocatedStripedBlock) firstBlock, parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
cellSize, dataBlocks, parityBlocks); cellSize, dataBlocks, parityBlocks);
@ -205,15 +176,14 @@ public class TestDFSStripedOutputStream {
blockGroupList.add(oneGroup); blockGroupList.add(oneGroup);
} }
//test each block group // test each block group
for (int group = 0; group < blockGroupList.size(); group++) { for (int group = 0; group < blockGroupList.size(); group++) {
//get the data of this block //get the data of this block
List<LocatedBlock> blockList = blockGroupList.get(group); List<LocatedBlock> blockList = blockGroupList.get(group);
byte[][] dataBlockBytes = new byte[dataBlocks][]; byte[][] dataBlockBytes = new byte[dataBlocks][];
byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][]; byte[][] parityBlockBytes = new byte[parityBlocks][];
// for each block, use BlockReader to read data
//for each block, use BlockReader to read data
for (int i = 0; i < blockList.size(); i++) { for (int i = 0; i < blockList.size(); i++) {
LocatedBlock lblock = blockList.get(i); LocatedBlock lblock = blockList.get(i);
if (lblock == null) { if (lblock == null) {
@ -269,19 +239,20 @@ public class TestDFSStripedOutputStream {
} }
}).build(); }).build();
blockReader.readAll(blockBytes, 0, (int)block.getNumBytes()); blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
blockReader.close(); blockReader.close();
} }
//check if we write the data correctly // check if we write the data correctly
for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; blkIdxInGroup++) { for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length;
byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup]; blkIdxInGroup++) {
final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
if (actualBlkBytes == null) { if (actualBlkBytes == null) {
continue; continue;
} }
for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) { for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
byte expected; byte expected;
//calculate the postion of this byte in the file // calculate the position of this byte in the file
long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize, long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
dataBlocks, posInBlk, blkIdxInGroup) + dataBlocks, posInBlk, blkIdxInGroup) +
group * blockSize * dataBlocks; group * blockSize * dataBlocks;
@ -291,15 +262,94 @@ public class TestDFSStripedOutputStream {
expected = getByte(posInFile); expected = getByte(posInFile);
} }
if (expected != actualBlkBytes[posInBlk]) { String s = "Unexpected byte " + actualBlkBytes[posInBlk]
Assert.fail("Unexpected byte " + actualBlkBytes[posInBlk] + ", expect " + expected + ", expect " + expected
+ ". Block group index is " + group + + ". Block group index is " + group
", stripe index is " + posInBlk / cellSize + + ", stripe index is " + posInBlk / cellSize
", cell index is " + blkIdxInGroup + ", byte index is " + posInBlk % cellSize); + ", cell index is " + blkIdxInGroup
+ ", byte index is " + posInBlk % cellSize;
Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]);
} }
} }
// verify the parity blocks
final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks];
final long groupSize = lbs.getLocatedBlocks().get(group).getBlockSize();
int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(groupSize,
cellSize, dataBlocks, dataBlocks);
for (int i = 0; i < parityBlocks; i++) {
parityBuffers[i] = ByteBuffer.allocate(parityBlkSize);
}
final int numStripes = (int) (groupSize - 1) / stripeDataSize() + 1;
for (int i = 0; i < numStripes; i++) {
final int parityCellSize = i < numStripes - 1 || parityBlkSize % cellSize == 0
? cellSize : parityBlkSize % cellSize;
ByteBuffer[] stripeBuf = new ByteBuffer[dataBlocks];
for (int k = 0; k < stripeBuf.length; k++) {
stripeBuf[k] = ByteBuffer.allocate(cellSize);
}
for (int j = 0; j < dataBlocks; j++) {
if (dataBlockBytes[j] != null) {
int length = Math.min(cellSize,
dataBlockBytes[j].length - cellSize * i);
if (length > 0) {
stripeBuf[j].put(dataBlockBytes[j], cellSize * i, length);
}
}
final long pos = stripeBuf[j].position();
for (int k = 0; k < parityCellSize - pos; k++) {
stripeBuf[j].put((byte) 0);
}
stripeBuf[j].flip();
}
ByteBuffer[] parityBuf = new ByteBuffer[parityBlocks];
for (int j = 0; j < parityBlocks; j++) {
parityBuf[j] = ByteBuffer.allocate(cellSize);
for (int k = 0; k < parityCellSize; k++) {
parityBuf[j].put((byte) 0);
}
parityBuf[j].flip();
}
encoder.encode(stripeBuf, parityBuf);
for (int j = 0; j < parityBlocks; j++) {
parityBuffers[j].put(parityBuf[j]);
}
}
for (int i = 0; i < parityBlocks; i++) {
Assert.assertArrayEquals(parityBuffers[i].array(), parityBlockBytes[i]);
} }
} }
} }
private void testReadWriteOneFile(String src, int writeBytes)
throws IOException {
Path TestPath = new Path(src);
byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
//check file length
FileStatus status = fs.getFileStatus(TestPath);
long fileLength = status.getLen();
if (fileLength != writeBytes) {
Assert.fail("File Length error: expect=" + writeBytes
+ ", actual=" + fileLength);
}
DFSStripedInputStream dis = new DFSStripedInputStream(
fs.getClient(), src, true);
byte[] buf = new byte[writeBytes + 100];
int readLen = dis.read(0, buf, 0, buf.length);
readLen = readLen >= 0 ? readLen : 0;
if (readLen != writeBytes) {
Assert.fail("The length of file is not correct.");
}
for (int i = 0; i < writeBytes; i++) {
if (getByte(i) != buf[i]) {
Assert.fail("Byte at i = " + i + " is wrongly written.");
}
}
}
} }