HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo

This commit is contained in:
Kai Zheng 2015-04-11 01:03:37 +08:00 committed by Zhe Zhang
parent 4d0bc724f2
commit 4970f2a2ab
7 changed files with 1030 additions and 14 deletions

View File

@ -56,4 +56,6 @@
HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)

View File

@ -266,8 +266,14 @@ public class DFSOutputStream extends FSOutputSummer
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
final DFSOutputStream out;
if(stat.getReplication() == 0) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
}
out.start();
return out;
} finally {
@ -347,6 +353,9 @@ public class DFSOutputStream extends FSOutputSummer
String[] favoredNodes) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("newStreamForAppend", src);
if(stat.getReplication() == 0) {
throw new IOException("Not support appending to a striping layout file yet.");
}
try {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
progress, lastBlock, stat, checksum, favoredNodes);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
@ -113,6 +114,19 @@ class DFSPacket {
dataPos += len;
}
synchronized void writeData(ByteBuffer inBuffer, int len)
throws ClosedChannelException {
checkBuffer();
len = len > inBuffer.remaining() ? inBuffer.remaining() : len;
if (dataPos + len > buf.length) {
throw new BufferOverflowException();
}
for (int i = 0; i < len; i++) {
buf[dataPos + i] = inBuffer.get();
}
dataPos += len;
}
/**
* Write checksums to this packet
*
@ -222,7 +236,7 @@ class DFSPacket {
*
* @return true if the packet is the last packet
*/
boolean isLastPacketInBlock(){
boolean isLastPacketInBlock() {
return lastPacketInBlock;
}
@ -231,7 +245,7 @@ class DFSPacket {
*
* @return the sequence number of this packet
*/
long getSeqno(){
long getSeqno() {
return seqno;
}
@ -240,14 +254,14 @@ class DFSPacket {
*
* @return the number of chunks in this packet
*/
synchronized int getNumChunks(){
synchronized int getNumChunks() {
return numChunks;
}
/**
* increase the number of chunks by one
*/
synchronized void incNumChunks(){
synchronized void incNumChunks() {
numChunks++;
}
@ -256,7 +270,7 @@ class DFSPacket {
*
* @return the maximum number of packets
*/
int getMaxChunks(){
int getMaxChunks() {
return maxChunks;
}
@ -265,7 +279,7 @@ class DFSPacket {
*
* @param syncBlock if to sync block
*/
synchronized void setSyncBlock(boolean syncBlock){
synchronized void setSyncBlock(boolean syncBlock) {
this.syncBlock = syncBlock;
}

View File

@ -0,0 +1,439 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
/****************************************************************
* The DFSStripedOutputStream class supports writing files in striped
* layout. Each stripe contains a sequence of cells and multiple
* {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible
* for writing the cells to different datanodes.
*
****************************************************************/
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream {
private final List<StripedDataStreamer> streamers;
/**
* Size of each striping cell, must be a multiple of bytesPerChecksum
*/
private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private ByteBuffer[] cellBuffers;
private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS
+ HdfsConstants.NUM_PARITY_BLOCKS;
private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private int curIdx = 0;
/* bytes written in current block group */
private long currentBlockGroupBytes = 0;
//TODO: Use ErasureCoder interface (HDFS-7781)
private RawErasureEncoder encoder;
private StripedDataStreamer getLeadingStreamer() {
return streamers.get(0);
}
private long getBlockGroupSize() {
return blockSize * HdfsConstants.NUM_DATA_BLOCKS;
}
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes)
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
DFSClient.LOG.info("Creating striped output stream");
if (blockGroupBlocks <= 1) {
throw new IOException("The block group must contain more than one block.");
}
cellBuffers = new ByteBuffer[blockGroupBlocks];
List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
for (int i = 0; i < blockGroupBlocks; i++) {
stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(blockGroupBlocks));
try {
cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
} catch (InterruptedException ie) {
final InterruptedIOException iioe = new InterruptedIOException(
"create cell buffers");
iioe.initCause(ie);
throw iioe;
}
}
encoder = new RSRawEncoder();
encoder.initialize(blockGroupDataBlocks,
blockGroupBlocks - blockGroupDataBlocks, cellSize);
streamers = new ArrayList<>(blockGroupBlocks);
for (short i = 0; i < blockGroupBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
i, stripeBlocks);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
streamers.add(streamer);
}
refreshStreamer();
}
private void refreshStreamer() {
streamer = streamers.get(curIdx);
}
private void moveToNextStreamer() {
curIdx = (curIdx + 1) % blockGroupBlocks;
refreshStreamer();
}
/**
* encode the buffers.
* After encoding, flip each buffer.
*
* @param buffers data buffers + parity buffers
*/
private void encode(ByteBuffer[] buffers) {
ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks];
ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks];
for (int i = 0; i < blockGroupBlocks; i++) {
if (i < blockGroupDataBlocks) {
dataBuffers[i] = buffers[i];
} else {
parityBuffers[i - blockGroupDataBlocks] = buffers[i];
}
}
encoder.encode(dataBuffers, parityBuffers);
}
/**
* Generate packets from a given buffer
*
* @param byteBuffer the given buffer to generate packets
* @return packets generated
* @throws IOException
*/
private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
throws IOException{
List<DFSPacket> packets = new ArrayList<>();
while (byteBuffer.remaining() > 0) {
DFSPacket p = createPacket(packetSize, chunksPerPacket,
streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), false);
int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
maxBytesToPacket: byteBuffer.remaining();
p.writeData(byteBuffer, toWrite);
streamer.incBytesCurBlock(toWrite);
packets.add(p);
}
return packets;
}
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
super.writeChunk(b, offset, len, checksum, ckoff, cklen);
if (getSizeOfCellnBuffer(curIdx) <= cellSize) {
addToCellBuffer(b, offset, len);
} else {
String msg = "Writing a chunk should not overflow the cell buffer.";
DFSClient.LOG.info(msg);
throw new IOException(msg);
}
// If current packet has not been enqueued for transmission,
// but the cell buffer is full, we need to enqueue the packet
if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" +
currentPacket.getSeqno() +
", curIdx=" + curIdx +
", src=" + src +
", bytesCurBlock=" + streamer.getBytesCurBlock() +
", blockSize=" + blockSize +
", appendChunk=" + streamer.getAppendChunk());
}
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
adjustChunkBoundary();
endBlock();
}
// Two extra steps are needed when a striping cell is full:
// 1. Forward the current index pointer
// 2. Generate parity packets if a full stripe of data cells are present
if (getSizeOfCellnBuffer(curIdx) == cellSize) {
//move curIdx to next cell
moveToNextStreamer();
//When all data cells in a stripe are ready, we need to encode
//them and generate some parity cells. These cells will be
//converted to packets and put to their DataStreamer's queue.
if (curIdx == blockGroupDataBlocks) {
//encode the data cells
for (int k = 0; k < blockGroupDataBlocks; k++) {
cellBuffers[k].flip();
}
encode(cellBuffers);
for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
ByteBuffer parityBuffer = cellBuffers[i];
List<DFSPacket> packets = generatePackets(parityBuffer);
for (DFSPacket p : packets) {
currentPacket = p;
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
}
endBlock();
moveToNextStreamer();
}
//read next stripe to cellBuffers
clearCellBuffers();
}
}
}
private void addToCellBuffer(byte[] b, int off, int len) {
cellBuffers[curIdx].put(b, off, len);
}
private int getSizeOfCellnBuffer(int cellIndex) {
return cellBuffers[cellIndex].position();
}
private void clearCellBuffers() {
for (int i = 0; i< blockGroupBlocks; i++) {
cellBuffers[i].clear();
}
}
private int stripeDataSize() {
return blockGroupDataBlocks * cellSize;
}
private void notSupported(String headMsg)
throws IOException{
throw new IOException(
headMsg + " is now not supported for striping layout.");
}
@Override
public void hflush() throws IOException {
notSupported("hflush");
}
@Override
public void hsync() throws IOException {
notSupported("hsync");
}
@Override
protected synchronized void start() {
for (StripedDataStreamer streamer : streamers) {
streamer.start();
}
}
@Override
synchronized void abort() throws IOException {
if (isClosed()) {
return;
}
for (StripedDataStreamer streamer : streamers) {
streamer.setLastException(new IOException("Lease timeout of "
+ (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
}
closeThreads(true);
dfsClient.endFileLease(fileId);
}
//TODO: Handle slow writers (HDFS-7786)
//Cuurently only check if the leading streamer is terminated
boolean isClosed() {
return closed || getLeadingStreamer().streamerClosed();
}
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
@Override
protected void closeThreads(boolean force) throws IOException {
StripedDataStreamer leadingStreamer = null;
for (StripedDataStreamer streamer : streamers) {
try {
streamer.close(force);
streamer.join();
streamer.closeSocket();
if (streamer.isLeadingStreamer()) {
leadingStreamer = streamer;
} else {
streamer.countTailingBlockGroupBytes();
}
} catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer");
} finally {
streamer.setSocketToNull();
setClosed();
}
}
leadingStreamer.countTailingBlockGroupBytes();
}
@Override
public synchronized void write(int b) throws IOException {
super.write(b);
currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize();
}
@Override
public synchronized void write(byte b[], int off, int len)
throws IOException {
super.write(b, off, len);
currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize();
}
private void writeParityCellsForLastStripe() throws IOException{
if(currentBlockGroupBytes == 0 ||
currentBlockGroupBytes % stripeDataSize() == 0)
return;
int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize());
// Size of parity cells should equal the size of the first cell, if it
// is not full.
int parityCellSize = cellSize;
int index = lastStripeLen / cellSize;
if (lastStripeLen < cellSize) {
parityCellSize = lastStripeLen;
index++;
}
for (int i = 0; i < blockGroupBlocks; i++) {
if (i >= index) {
int position = cellBuffers[i].position();
for (int j = 0; j < parityCellSize - position; j++) {
cellBuffers[i].put((byte)0);
}
}
cellBuffers[i].flip();
}
encode(cellBuffers);
//write parity cells
curIdx = blockGroupDataBlocks;
refreshStreamer();
for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
ByteBuffer parityBuffer = cellBuffers[i];
List<DFSPacket> packets = generatePackets(parityBuffer);
for (DFSPacket p : packets) {
currentPacket = p;
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
}
endBlock();
moveToNextStreamer();
}
clearCellBuffers();
}
@Override
void setClosed() {
super.setClosed();
for (int i = 0; i < blockGroupBlocks; i++) {
byteArrayManager.release(cellBuffers[i].array());
streamers.get(i).release();
}
}
@Override
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
IOException e = getLeadingStreamer().getLastException().getAndSet(null);
if (e == null)
return;
else
throw e;
}
try {
// flush from all upper layers
flushBuffer();
if (currentPacket != null) {
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
}
//if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
for (int i = 0; i < blockGroupBlocks; i++) {
curIdx = i;
refreshStreamer();
if (streamer.getBytesCurBlock()!= 0 ||
currentBlockGroupBytes < getBlockGroupSize()) {
// send an empty packet to mark the end of the block
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
}
// flush all data to Datanode
flushInternal();
}
// get last block before destroying the streamer
ExtendedBlock lastBlock = streamers.get(0).getBlock();
closeThreads(false);
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
completeFile(lastBlock);
} finally {
scope.close();
}
dfsClient.endFileLease(fileId);
} catch (ClosedChannelException e) {
} finally {
setClosed();
}
}
}

View File

@ -336,7 +336,7 @@ class DataStreamer extends Daemon {
}
private volatile boolean streamerClosed = false;
private ExtendedBlock block; // its length is number of bytes acked
protected ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
@ -367,12 +367,12 @@ class DataStreamer extends Daemon {
private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
private Socket s;
private final DFSClient dfsClient;
private final String src;
protected final DFSClient dfsClient;
protected final String src;
/** Only for DataTransferProtocol.writeBlock(..) */
private final DataChecksum checksum4WriteBlock;
private final Progressable progress;
private final HdfsFileStatus stat;
protected final HdfsFileStatus stat;
// appending to existing partial block
private volatile boolean appendChunk = false;
// both dataQueue and ackQueue are protected by dataQueue lock
@ -489,7 +489,7 @@ class DataStreamer extends Daemon {
stage = BlockConstructionStage.DATA_STREAMING;
}
private void endBlock() {
protected void endBlock() {
if(LOG.isDebugEnabled()) {
LOG.debug("Closing old block " + block);
}

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.List;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/****************************************************************************
* The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
* There are two kinds of StripedDataStreamer, leading streamer and ordinary
* stream. Leading streamer requests a block group from NameNode, unwraps
* it to located blocks and transfers each located block to its corresponding
* ordinary streamer via a blocking queue.
*
****************************************************************************/
public class StripedDataStreamer extends DataStreamer {
private final short index;
private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS
+ HdfsConstants.NUM_PARITY_BLOCKS;
private boolean hasCommittedBlock = false;
StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, short index,
List<BlockingQueue<LocatedBlock>> stripedBlocks) {
super(stat,block, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage);
this.index = index;
this.stripedBlocks = stripedBlocks;
}
/**
* Construct a data streamer for appending to the last partial block
* @param lastBlock last block of the file to be appended
* @param stat status of the file to be appended
* @throws IOException if error occurs
*/
StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, short index,
List<BlockingQueue<LocatedBlock>> stripedBlocks)
throws IOException {
super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage);
this.index = index;
this.stripedBlocks = stripedBlocks;
}
public boolean isLeadingStreamer () {
return index == 0;
}
private boolean isParityStreamer() {
return index >= HdfsConstants.NUM_DATA_BLOCKS;
}
@Override
protected void endBlock() {
if (!isLeadingStreamer() && !isParityStreamer()) {
//before retrieving a new block, transfer the finished block to
//leading streamer
LocatedBlock finishedBlock = new LocatedBlock(
new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
block.getNumBytes(),block.getGenerationStamp()), null);
try{
boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
TimeUnit.SECONDS);
}catch (InterruptedException ie) {
//TODO: Handle InterruptedException (HDFS-7786)
}
}
super.endBlock();
}
/**
* This function is called after the streamer is closed.
*/
void countTailingBlockGroupBytes () throws IOException {
if (isLeadingStreamer()) {
//when committing a block group, leading streamer has to adjust
// {@link block} including the size of block group
for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
try {
LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
TimeUnit.SECONDS);
if (finishedLocatedBlock == null) {
throw new IOException("Fail to get finished LocatedBlock " +
"from streamer, i=" + i);
}
ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
if (block != null) {
block.setNumBytes(block.getNumBytes() + bytes);
}
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when " +
"putting a block to stripeBlocks, ie = " + ie);
}
}
} else if (!isParityStreamer()) {
if (block == null || block.getNumBytes() == 0) {
LocatedBlock finishedBlock = new LocatedBlock(null, null);
try {
boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
TimeUnit.SECONDS);
} catch (InterruptedException ie) {
//TODO: Handle InterruptedException (HDFS-7786)
ie.printStackTrace();
}
}
}
}
@Override
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
LocatedBlock lb = null;
if (isLeadingStreamer()) {
if(hasCommittedBlock) {
//when committing a block group, leading streamer has to adjust
// {@link block} including the size of block group
for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
try {
LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
TimeUnit.SECONDS);
if (finishedLocatedBlock == null) {
throw new IOException("Fail to get finished LocatedBlock " +
"from streamer, i=" + i);
}
ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
if(block != null) {
block.setNumBytes(block.getNumBytes() + bytes);
}
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when putting" +
" a block to stripeBlocks, ie = " + ie);
}
}
}
lb = super.locateFollowingBlock(excludedNodes);
hasCommittedBlock = true;
LocatedBlock[] blocks = unwrapBlockGroup(lb);
assert blocks.length == blockGroupSize :
"Fail to get block group from namenode: blockGroupSize: " +
blockGroupSize + ", blocks.length: " + blocks.length;
lb = blocks[0];
for (int i = 1; i < blocks.length; i++) {
try {
boolean offSuccess = stripedBlocks.get(i).offer(blocks[i],
90, TimeUnit.SECONDS);
if(!offSuccess){
String msg = "Fail to put block to stripeBlocks. i = " + i;
DFSClient.LOG.info(msg);
throw new IOException(msg);
} else {
DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i
+ ", block: " + blocks[i]);
}
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when putting" +
" a block to stripeBlocks, ie = " + ie);
}
}
} else {
try {
//wait 90 seconds to get a block from the queue
lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when retrieving " +
"a block from stripeBlocks, ie = " + ie);
}
}
return lb;
}
/**
* Generate other blocks in a block group according to the first one.
*
* @param firstBlockInGroup the first block in a block group
* @return other blocks in this group
*/
public static LocatedBlock[] unwrapBlockGroup(
final LocatedBlock firstBlockInGroup) {
ExtendedBlock eb = firstBlockInGroup.getBlock();
DatanodeInfo[] locs = firstBlockInGroup.getLocations();
String[] storageIDs = firstBlockInGroup.getStorageIDs();
StorageType[] storageTypes = firstBlockInGroup.getStorageTypes();
Token<BlockTokenIdentifier> blockToken = firstBlockInGroup.getBlockToken();
LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length];
for (int i = 0; i < blocksInGroup.length; i++) {
//each block in a group has the same number of bytes and timestamp
ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(),
eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp());
blocksInGroup[i] = new LocatedBlock(extendedBlock,
new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]},
new StorageType[] {storageTypes[i]});
blocksInGroup[i].setBlockToken(blockToken);
}
return blocksInGroup;
}
}

View File

@ -0,0 +1,311 @@
package org.apache.hadoop.hdfs;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class TestDFSStripedOutputStream {
private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
private DistributedFileSystem fs;
int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
int blockSize = 8 * 1024 * 1024;
int cellsInBlock = blockSize / cellSize;
private int mod = 29;
@Before
public void setup() throws IOException {
int numDNs = dataBlocks + parityBlocks + 2;
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/");
fs = cluster.getFileSystem();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void TestFileEmpty() throws IOException {
testOneFile("/EmptyFile", 0);
}
@Test
public void TestFileSmallerThanOneCell1() throws IOException {
testOneFile("/SmallerThanOneCell", 1);
}
@Test
public void TestFileSmallerThanOneCell2() throws IOException {
testOneFile("/SmallerThanOneCell", cellSize - 1);
}
@Test
public void TestFileEqualsWithOneCell() throws IOException {
testOneFile("/EqualsWithOneCell", cellSize);
}
@Test
public void TestFileSmallerThanOneStripe1() throws IOException {
testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
}
@Test
public void TestFileSmallerThanOneStripe2() throws IOException {
testOneFile("/SmallerThanOneStripe", cellSize + 123);
}
@Test
public void TestFileEqualsWithOneStripe() throws IOException {
testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks);
}
@Test
public void TestFileMoreThanOneStripe1() throws IOException {
testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
}
@Test
public void TestFileMoreThanOneStripe2() throws IOException {
testOneFile("/MoreThanOneStripe2",
cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1)
+ cellSize * dataBlocks + 123);
}
@Test
public void TestFileFullBlockGroup() throws IOException {
testOneFile("/FullBlockGroup", blockSize * dataBlocks);
}
//TODO: The following tests will pass after HDFS-8121 fixed
// @Test
public void TestFileMoreThanABlockGroup1() throws IOException {
testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
}
// @Test
public void TestFileMoreThanABlockGroup2() throws IOException {
testOneFile("/MoreThanABlockGroup2",
blockSize * dataBlocks * 3
+ (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks
+ 123);
}
private int stripeDataSize() {
return cellSize * dataBlocks;
}
private byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
for (int i = 0; i < cnt; i++) {
bytes[i] = getByte(i);
}
return bytes;
}
private byte getByte(long pos) {
return (byte) (pos % mod + 1);
}
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
throws IOException {
Path TestPath = new Path(src);
byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
//check file length
FileStatus status = fs.getFileStatus(TestPath);
long fileLength = status.getLen();
if (fileLength != writeBytes) {
Assert.fail("File Length error: expect=" + writeBytes
+ ", actual=" + fileLength);
}
DFSStripedInputStream dis = new DFSStripedInputStream(
fs.getClient(), src, true);
byte[] buf = new byte[writeBytes + 100];
int readLen = dis.read(0, buf, 0, buf.length);
readLen = readLen >= 0 ? readLen : 0;
if (readLen != writeBytes) {
Assert.fail("The length of file is not correct.");
}
for (int i = 0; i < writeBytes; i++) {
if (getByte(i) != buf[i]) {
Assert.fail("Byte at i = " + i + " is wrongly written.");
}
}
}
private void testOneFile(String src, int writeBytes)
throws IOException {
Path TestPath = new Path(src);
int allBlocks = dataBlocks + parityBlocks;
byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
//check file length
FileStatus status = fs.getFileStatus(TestPath);
long fileLength = status.getLen();
if (fileLength != writeBytes) {
Assert.fail("File Length error: expect=" + writeBytes
+ ", actual=" + fileLength);
}
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock);
List<LocatedBlock> oneGroup = Arrays.asList(blocks);
blockGroupList.add(oneGroup);
}
//test each block group
for (int group = 0; group < blockGroupList.size(); group++) {
//get the data of this block
List<LocatedBlock> blockList = blockGroupList.get(group);
byte[][] dataBlockBytes = new byte[dataBlocks][];
byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
//calculate the size of this block group
int lenOfBlockGroup = group < blockGroupList.size() - 1 ?
blockSize * dataBlocks :
writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks;
int intactStripes = lenOfBlockGroup / stripeDataSize();
int lastStripeLen = lenOfBlockGroup % stripeDataSize();
//for each block, use BlockReader to read data
for (int i = 0; i < blockList.size(); i++) {
LocatedBlock lblock = blockList.get(i);
if (lblock == null) {
continue;
}
DatanodeInfo[] nodes = lblock.getLocations();
ExtendedBlock block = lblock.getBlock();
InetSocketAddress targetAddr = NetUtils.createSocketAddr(
nodes[0].getXferAddr());
int lenOfCell = cellSize;
if (i == lastStripeLen / cellSize) {
lenOfCell = lastStripeLen % cellSize;
} else if (i > lastStripeLen / cellSize) {
lenOfCell = 0;
}
int lenOfBlock = cellSize * intactStripes + lenOfCell;
byte[] blockBytes = new byte[lenOfBlock];
if (i < dataBlocks) {
dataBlockBytes[i] = blockBytes;
} else {
parityBlockBytes[i - dataBlocks] = blockBytes;
}
if (lenOfBlock == 0) {
continue;
}
block.setNumBytes(lenOfBlock);
BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
setFileName(src).
setBlock(block).
setBlockToken(lblock.getBlockToken()).
setInetSocketAddress(targetAddr).
setStartOffset(0).
setLength(block.getNumBytes()).
setVerifyChecksum(true).
setClientName("TestStripeLayoutWrite").
setDatanodeInfo(nodes[0]).
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
setClientCacheContext(ClientContext.getFromConf(conf)).
setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken,
DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocket(sock);
} finally {
if (peer == null) {
IOUtils.closeSocket(sock);
}
}
return peer;
}
}).build();
blockReader.readAll(blockBytes, 0, lenOfBlock);
blockReader.close();
}
//check if we write the data correctly
for (int i = 0; i < dataBlockBytes.length; i++) {
byte[] cells = dataBlockBytes[i];
if (cells == null) {
continue;
}
for (int j = 0; j < cells.length; j++) {
byte expected;
//calculate the postion of this byte in the file
long pos = group * dataBlocks * blockSize
+ (i * cellSize + j / cellSize * cellSize * dataBlocks)
+ j % cellSize;
if (pos >= writeBytes) {
expected = 0;
} else {
expected = getByte(pos);
}
if (expected != cells[j]) {
Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected
+ ". Block group index is " + group +
", stripe index is " + j / cellSize +
", cell index is " + i + ", byte index is " + j % cellSize);
}
}
}
}
}
}