HDFS-7672. Handle write failure for stripping blocks and refactor the existing code in DFSStripedOutputStream and StripedDataStreamer.

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-05-05 16:26:49 -07:00 committed by Zhe Zhang
parent e849be2d31
commit 220ca960bc
10 changed files with 769 additions and 339 deletions

View File

@ -172,3 +172,6 @@
HDFS-8324. Add trace info to DFSClient#getErasureCodingZoneInfo(..) (vinayakumarb via
umamahesh)
HDFS-7672. Handle write failure for stripping blocks and refactor the
existing code in DFSStripedOutputStream and StripedDataStreamer. (szetszwo)

View File

@ -24,6 +24,8 @@ import java.nio.channels.ClosedChannelException;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
@ -86,6 +88,8 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer
implements Syncable, CanSetDropBehind {
static final Log LOG = LogFactory.getLog(DFSOutputStream.class);
/**
* Number of times to retry creating a file when there are transient
* errors (typically related to encryption zones and KeyProvider operations).
@ -419,32 +423,43 @@ public class DFSOutputStream extends FSOutputSummer
streamer.incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
streamer.getBytesCurBlock() == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", bytesCurBlock=" + streamer.getBytesCurBlock() +
", blockSize=" + blockSize +
", appendChunk=" + streamer.getAppendChunk());
}
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
adjustChunkBoundary();
endBlock();
enqueueCurrentPacketFull();
}
}
void enqueueCurrentPacket() throws IOException {
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
}
void enqueueCurrentPacketFull() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("enqueue full " + currentPacket + ", src=" + src
+ ", bytesCurBlock=" + streamer.getBytesCurBlock()
+ ", blockSize=" + blockSize
+ ", appendChunk=" + streamer.getAppendChunk()
+ ", " + streamer);
}
enqueueCurrentPacket();
adjustChunkBoundary();
endBlock();
}
/** create an empty packet to mark the end of the block */
void setCurrentPacket2Empty() throws InterruptedIOException {
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
}
/**
* If the reopened file did not end at chunk boundary and the above
* write filled up its partial chunk. Tell the summer to generate full
* crc chunks from now on.
*/
protected void adjustChunkBoundary() {
private void adjustChunkBoundary() {
if (streamer.getAppendChunk() &&
streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
streamer.setAppendChunk(false);
@ -466,11 +481,8 @@ public class DFSOutputStream extends FSOutputSummer
*/
protected void endBlock() throws IOException {
if (streamer.getBytesCurBlock() == blockSize) {
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
setCurrentPacket2Empty();
enqueueCurrentPacket();
streamer.setBytesCurBlock(0);
lastFlushOffset = 0;
}
@ -592,8 +604,7 @@ public class DFSOutputStream extends FSOutputSummer
}
if (currentPacket != null) {
currentPacket.setSyncBlock(isSync);
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
enqueueCurrentPacket();
}
if (endBlock && streamer.getBytesCurBlock() > 0) {
// Need to end the current block, thus send an empty packet to
@ -601,8 +612,7 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock || isSync);
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
enqueueCurrentPacket();
streamer.setBytesCurBlock(0);
lastFlushOffset = 0;
} else {
@ -779,15 +789,11 @@ public class DFSOutputStream extends FSOutputSummer
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
enqueueCurrentPacket();
}
if (streamer.getBytesCurBlock() != 0) {
// send an empty packet to mark the end of the block
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
setCurrentPacket2Empty();
}
flushInternal(); // flush all data to Datanodes
@ -901,4 +907,9 @@ public class DFSOutputStream extends FSOutputSummer
public long getFileId() {
return fileId;
}
@Override
public String toString() {
return getClass().getSimpleName() + ":" + streamer;
}
}

View File

@ -28,14 +28,16 @@ import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
@ -44,6 +46,8 @@ import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.base.Preconditions;
/****************************************************************
* The DFSStripedOutputStream class supports writing files in striped
@ -55,117 +59,250 @@ import org.apache.htrace.TraceScope;
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream {
/** Coordinate the communication between the streamers. */
static class Coordinator {
private final List<BlockingQueue<ExtendedBlock>> endBlocks;
private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
private volatile boolean shouldLocateFollowingBlock = false;
Coordinator(final int numDataBlocks, final int numAllBlocks) {
endBlocks = new ArrayList<>(numDataBlocks);
for (int i = 0; i < numDataBlocks; i++) {
endBlocks.add(new LinkedBlockingQueue<ExtendedBlock>(1));
}
stripedBlocks = new ArrayList<>(numAllBlocks);
for (int i = 0; i < numAllBlocks; i++) {
stripedBlocks.add(new LinkedBlockingQueue<LocatedBlock>(1));
}
}
boolean shouldLocateFollowingBlock() {
return shouldLocateFollowingBlock;
}
void putEndBlock(int i, ExtendedBlock block) {
shouldLocateFollowingBlock = true;
final boolean b = endBlocks.get(i).offer(block);
Preconditions.checkState(b, "Failed to add " + block
+ " to endBlocks queue, i=" + i);
}
ExtendedBlock getEndBlock(int i) throws InterruptedIOException {
try {
return endBlocks.get(i).poll(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw DFSUtil.toInterruptedIOException(
"getEndBlock interrupted, i=" + i, e);
}
}
void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
ExtendedBlock b = endBlocks.get(i).peek();
if (b == null) {
// streamer just has failed, put end block and continue
b = block;
putEndBlock(i, b);
}
b.setNumBytes(newBytes);
}
void putStripedBlock(int i, LocatedBlock block) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("putStripedBlock " + block + ", i=" + i);
}
final boolean b = stripedBlocks.get(i).offer(block);
if (!b) {
throw new IOException("Failed: " + block + ", i=" + i);
}
}
LocatedBlock getStripedBlock(int i) throws IOException {
final LocatedBlock lb;
try {
lb = stripedBlocks.get(i).poll(90, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e);
}
if (lb == null) {
throw new IOException("Failed: i=" + i);
}
return lb;
}
}
/** Buffers for writing the data and parity cells of a strip. */
class CellBuffers {
private final ByteBuffer[] buffers;
private final byte[][] checksumArrays;
CellBuffers(int numParityBlocks) throws InterruptedException{
if (cellSize % bytesPerChecksum != 0) {
throw new HadoopIllegalArgumentException("Invalid values: "
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
+ bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
}
checksumArrays = new byte[numParityBlocks][];
final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
for (int i = 0; i < checksumArrays.length; i++) {
checksumArrays[i] = new byte[size];
}
buffers = new ByteBuffer[numAllBlocks];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
}
}
private ByteBuffer[] getBuffers() {
return buffers;
}
byte[] getChecksumArray(int i) {
return checksumArrays[i - numDataBlocks];
}
private int addTo(int i, byte[] b, int off, int len) {
final ByteBuffer buf = buffers[i];
final int pos = buf.position() + len;
Preconditions.checkState(pos <= cellSize);
buf.put(b, off, len);
return pos;
}
private void clear() {
for (int i = 0; i< numAllBlocks; i++) {
buffers[i].clear();
if (i >= numDataBlocks) {
Arrays.fill(buffers[i].array(), (byte) 0);
}
}
}
private void release() {
for (int i = 0; i < numAllBlocks; i++) {
byteArrayManager.release(buffers[i].array());
}
}
private void flipDataBuffers() {
for (int i = 0; i < numDataBlocks; i++) {
buffers[i].flip();
}
}
}
private final Coordinator coordinator;
private final CellBuffers cellBuffers;
private final RawErasureEncoder encoder;
private final List<StripedDataStreamer> streamers;
/**
* Size of each striping cell, must be a multiple of bytesPerChecksum
*/
private final ECInfo ecInfo;
/** Size of each striping cell, must be a multiple of bytesPerChecksum */
private final int cellSize;
// checksum buffer, we only need to calculate checksum for parity blocks
private byte[] checksumBuf;
private ByteBuffer[] cellBuffers;
private final short numAllBlocks;
private final short numDataBlocks;
private int curIdx = 0;
/* bytes written in current block group */
//private long currentBlockGroupBytes = 0;
//TODO: Use ErasureCoder interface (HDFS-7781)
private RawErasureEncoder encoder;
private final int numAllBlocks;
private final int numDataBlocks;
private StripedDataStreamer getLeadingStreamer() {
return streamers.get(0);
}
private long getBlockGroupSize() {
return blockSize * numDataBlocks;
}
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes)
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
DFSClient.LOG.info("Creating striped output stream");
if (LOG.isDebugEnabled()) {
LOG.debug("Creating DFSStripedOutputStream for " + src);
}
// ECInfo is restored from NN just before writing striped files.
ecInfo = dfsClient.getErasureCodingInfo(src);
cellSize = ecInfo.getSchema().getChunkSize();
numAllBlocks = (short)(ecInfo.getSchema().getNumDataUnits()
+ ecInfo.getSchema().getNumParityUnits());
numDataBlocks = (short)ecInfo.getSchema().getNumDataUnits();
//TODO reduce an rpc call HDFS-8289
final ECSchema schema = dfsClient.getErasureCodingInfo(src).getSchema();
final int numParityBlocks = schema.getNumParityUnits();
cellSize = schema.getChunkSize();
numDataBlocks = schema.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks;
checkConfiguration();
checksumBuf = new byte[getChecksumSize() * (cellSize / bytesPerChecksum)];
cellBuffers = new ByteBuffer[numAllBlocks];
List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
for (int i = 0; i < numAllBlocks; i++) {
stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(numAllBlocks));
try {
cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
} catch (InterruptedException ie) {
final InterruptedIOException iioe = new InterruptedIOException(
"create cell buffers");
iioe.initCause(ie);
throw iioe;
}
}
encoder = new RSRawEncoder();
encoder.initialize(numDataBlocks,
numAllBlocks - numDataBlocks, cellSize);
encoder.initialize(numDataBlocks, numParityBlocks, cellSize);
coordinator = new Coordinator(numDataBlocks, numAllBlocks);
try {
cellBuffers = new CellBuffers(numParityBlocks);
} catch (InterruptedException ie) {
throw DFSUtil.toInterruptedIOException("Failed to create cell buffers", ie);
}
List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
i, stripeBlocks, favoredNodes);
favoredNodes, i, coordinator);
s.add(streamer);
}
streamers = Collections.unmodifiableList(s);
refreshStreamer();
setCurrentStreamer(0);
}
private void checkConfiguration() {
if (cellSize % bytesPerChecksum != 0) {
throw new HadoopIllegalArgumentException("Invalid values: "
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ ") must divide cell size (=" + cellSize + ").");
}
StripedDataStreamer getStripedDataStreamer(int i) {
return streamers.get(i);
}
private void refreshStreamer() {
streamer = streamers.get(curIdx);
int getCurrentIndex() {
return getCurrentStreamer().getIndex();
}
private void moveToNextStreamer() {
curIdx = (curIdx + 1) % numAllBlocks;
refreshStreamer();
StripedDataStreamer getCurrentStreamer() {
return (StripedDataStreamer)streamer;
}
private StripedDataStreamer setCurrentStreamer(int i) {
streamer = streamers.get(i);
return getCurrentStreamer();
}
/**
* encode the buffers.
* After encoding, flip each buffer.
* Encode the buffers, i.e. compute parities.
*
* @param buffers data buffers + parity buffers
*/
private void encode(ByteBuffer[] buffers) {
ByteBuffer[] dataBuffers = new ByteBuffer[numDataBlocks];
ByteBuffer[] parityBuffers = new ByteBuffer[numAllBlocks - numDataBlocks];
for (int i = 0; i < numAllBlocks; i++) {
if (i < numDataBlocks) {
dataBuffers[i] = buffers[i];
} else {
parityBuffers[i - numDataBlocks] = buffers[i];
private static void encode(RawErasureEncoder encoder, int numData,
ByteBuffer[] buffers) {
final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
encoder.encode(dataBuffers, parityBuffers);
}
private void checkStreamers() throws IOException {
int count = 0;
for(StripedDataStreamer s : streamers) {
if (!s.isFailed()) {
count++;
}
}
encoder.encode(dataBuffers, parityBuffers);
if (LOG.isDebugEnabled()) {
LOG.debug("checkStreamers: " + streamers);
LOG.debug("count=" + count);
}
if (count < numDataBlocks) {
throw new IOException("Failed: the number of remaining blocks = "
+ count + " < the number of data blocks = " + numDataBlocks);
}
}
private void handleStreamerFailure(String err, Exception e) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().setIsFailed(true);
checkStreamers();
currentPacket = null;
}
/**
@ -173,11 +310,12 @@ public class DFSStripedOutputStream extends DFSOutputStream {
* writing parity blocks.
*
* @param byteBuffer the given buffer to generate packets
* @param checksumBuf the checksum buffer
* @return packets generated
* @throws IOException
*/
private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
throws IOException{
private List<DFSPacket> generatePackets(
ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{
List<DFSPacket> packets = new ArrayList<>();
assert byteBuffer.hasArray();
getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0,
@ -201,82 +339,47 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
super.writeChunk(b, offset, len, checksum, ckoff, cklen);
final int index = getCurrentIndex();
final StripedDataStreamer current = getCurrentStreamer();
final int pos = cellBuffers.addTo(index, bytes, offset, len);
final boolean cellFull = pos == cellSize;
if (getSizeOfCellnBuffer(curIdx) <= cellSize) {
addToCellBuffer(b, offset, len);
} else {
String msg = "Writing a chunk should not overflow the cell buffer.";
DFSClient.LOG.info(msg);
throw new IOException(msg);
final long oldBytes = current.getBytesCurBlock();
if (!current.isFailed()) {
try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
// cell is full and current packet has not been enqueued,
if (cellFull && currentPacket != null) {
enqueueCurrentPacketFull();
}
} catch(Exception e) {
handleStreamerFailure("offset=" + offset + ", length=" + len, e);
}
}
// If current packet has not been enqueued for transmission,
// but the cell buffer is full, we need to enqueue the packet
if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" +
currentPacket.getSeqno() +
", curIdx=" + curIdx +
", src=" + src +
", bytesCurBlock=" + streamer.getBytesCurBlock() +
", blockSize=" + blockSize +
", appendChunk=" + streamer.getAppendChunk());
}
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
adjustChunkBoundary();
endBlock();
if (current.isFailed()) {
final long newBytes = oldBytes + len;
coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
current.setBytesCurBlock(newBytes);
}
// Two extra steps are needed when a striping cell is full:
// 1. Forward the current index pointer
// 2. Generate parity packets if a full stripe of data cells are present
if (getSizeOfCellnBuffer(curIdx) == cellSize) {
//move curIdx to next cell
moveToNextStreamer();
if (cellFull) {
int next = index + 1;
//When all data cells in a stripe are ready, we need to encode
//them and generate some parity cells. These cells will be
//converted to packets and put to their DataStreamer's queue.
if (curIdx == numDataBlocks) {
//encode the data cells
for (int k = 0; k < numDataBlocks; k++) {
cellBuffers[k].flip();
}
encode(cellBuffers);
for (int i = numDataBlocks; i < numAllBlocks; i++) {
ByteBuffer parityBuffer = cellBuffers[i];
List<DFSPacket> packets = generatePackets(parityBuffer);
for (DFSPacket p : packets) {
currentPacket = p;
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
}
endBlock();
moveToNextStreamer();
}
//read next stripe to cellBuffers
clearCellBuffers();
}
}
}
private void addToCellBuffer(byte[] b, int off, int len) {
cellBuffers[curIdx].put(b, off, len);
}
private int getSizeOfCellnBuffer(int cellIndex) {
return cellBuffers[cellIndex].position();
}
private void clearCellBuffers() {
for (int i = 0; i< numAllBlocks; i++) {
cellBuffers[i].clear();
if (i >= numDataBlocks) {
Arrays.fill(cellBuffers[i].array(), (byte) 0);
if (next == numDataBlocks) {
cellBuffers.flipDataBuffers();
writeParityCells();
next = 0;
}
setCurrentStreamer(next);
}
}
@ -284,20 +387,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return numDataBlocks * cellSize;
}
private void notSupported(String headMsg)
throws IOException{
throw new IOException(
headMsg + " is now not supported for striping layout.");
@Override
public void hflush() {
throw new UnsupportedOperationException();
}
@Override
public void hflush() throws IOException {
notSupported("hflush");
}
@Override
public void hsync() throws IOException {
notSupported("hsync");
public void hsync() {
throw new UnsupportedOperationException();
}
@Override
@ -327,29 +424,28 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return closed || getLeadingStreamer().streamerClosed();
}
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
@Override
protected void closeThreads(boolean force) throws IOException {
int index = 0;
boolean exceptionOccurred = false;
final MultipleIOException.Builder b = new MultipleIOException.Builder();
for (StripedDataStreamer streamer : streamers) {
try {
streamer.close(force);
streamer.join();
streamer.closeSocket();
} catch (InterruptedException | IOException e) {
DFSClient.LOG.error("Failed to shutdown streamer: name="
+ streamer.getName() + ", index=" + index + ", file=" + src, e);
exceptionOccurred = true;
} catch(Exception e) {
try {
handleStreamerFailure("force=" + force, e);
} catch(IOException ioe) {
b.add(ioe);
}
} finally {
streamer.setSocketToNull();
setClosed();
index++;
}
}
if (exceptionOccurred) {
throw new IOException("Failed to shutdown streamer");
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
}
@ -370,50 +466,69 @@ public class DFSStripedOutputStream extends DFSOutputStream {
if (currentBlockGroupBytes % stripeDataSize() == 0) {
return;
}
long firstCellSize = getLeadingStreamer().getBytesCurBlock() % cellSize;
long parityCellSize = firstCellSize > 0 && firstCellSize < cellSize ?
final int firstCellSize = (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
firstCellSize : cellSize;
final ByteBuffer[] buffers = cellBuffers.getBuffers();
for (int i = 0; i < numAllBlocks; i++) {
// Pad zero bytes to make all cells exactly the size of parityCellSize
// If internal block is smaller than parity block, pad zero bytes.
// Also pad zero bytes to all parity cells
int position = cellBuffers[i].position();
final int position = buffers[i].position();
assert position <= parityCellSize : "If an internal block is smaller" +
" than parity block, then its last cell should be small than last" +
" parity cell";
for (int j = 0; j < parityCellSize - position; j++) {
cellBuffers[i].put((byte) 0);
buffers[i].put((byte) 0);
}
cellBuffers[i].flip();
buffers[i].flip();
}
encode(cellBuffers);
// write parity cells
curIdx = numDataBlocks;
refreshStreamer();
writeParityCells();
}
void writeParityCells() throws IOException {
final ByteBuffer[] buffers = cellBuffers.getBuffers();
//encode the data cells
encode(encoder, numDataBlocks, buffers);
for (int i = numDataBlocks; i < numAllBlocks; i++) {
ByteBuffer parityBuffer = cellBuffers[i];
List<DFSPacket> packets = generatePackets(parityBuffer);
for (DFSPacket p : packets) {
currentPacket = p;
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
}
cellBuffers.clear();
}
void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
) throws IOException {
final StripedDataStreamer current = setCurrentStreamer(index);
final int len = buffer.limit();
final long oldBytes = current.getBytesCurBlock();
if (!current.isFailed()) {
try {
for (DFSPacket p : generatePackets(buffer, checksumBuf)) {
streamer.waitAndQueuePacket(p);
}
endBlock();
} catch(Exception e) {
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
}
endBlock();
moveToNextStreamer();
}
clearCellBuffers();
if (current.isFailed()) {
final long newBytes = oldBytes + len;
current.setBytesCurBlock(newBytes);
}
}
@Override
void setClosed() {
super.setClosed();
for (int i = 0; i < numAllBlocks; i++) {
byteArrayManager.release(cellBuffers[i].array());
streamers.get(i).release();
}
cellBuffers.release();
}
@Override
@ -425,25 +540,31 @@ public class DFSStripedOutputStream extends DFSOutputStream {
try {
// flush from all upper layers
flushBuffer();
if (currentPacket != null) {
streamer.waitAndQueuePacket(currentPacket);
currentPacket = null;
try {
flushBuffer();
if (currentPacket != null) {
enqueueCurrentPacket();
}
} catch(Exception e) {
handleStreamerFailure("closeImpl", e);
}
// if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
for (int i = 0; i < numAllBlocks; i++) {
curIdx = i;
refreshStreamer();
if (streamer.getBytesCurBlock() > 0) {
// send an empty packet to mark the end of the block
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
final StripedDataStreamer s = setCurrentStreamer(i);
if (!s.isFailed()) {
try {
if (s.getBytesCurBlock() > 0) {
setCurrentPacket2Empty();
}
// flush all data to Datanode
flushInternal();
} catch(Exception e) {
handleStreamerFailure("closeImpl", e);
}
}
// flush all data to Datanode
flushInternal();
}
closeThreads(false);

View File

@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PAS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
@ -55,7 +56,6 @@ import java.util.concurrent.ThreadLocalRandom;
import javax.net.SocketFactory;
import com.google.common.collect.Sets;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
@ -96,6 +96,7 @@ import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.BlockingService;
@InterfaceAudience.Private
@ -1513,7 +1514,7 @@ public class DFSUtil {
public static int getSmallBufferSize(Configuration conf) {
return Math.min(getIoFileBufferSize(conf) / 2, 512);
}
/**
* Probe for HDFS Encryption being enabled; this uses the value of
* the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI},
@ -1527,4 +1528,10 @@ public class DFSUtil {
DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
}
public static InterruptedIOException toInterruptedIOException(String message,
InterruptedException e) {
final InterruptedIOException iioe = new InterruptedIOException(message);
iioe.initCause(e);
return iioe;
}
}

View File

@ -575,7 +575,7 @@ class DataStreamer extends Daemon {
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
LOG.debug("Allocating new block");
LOG.debug("Allocating new block " + this);
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
@ -593,10 +593,7 @@ class DataStreamer extends Daemon {
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
" is smaller than data size. " +
" Offset of packet in block " +
lastByteOffsetInBlock +
" Aborting file " + src);
" < lastByteOffsetInBlock, " + this + ", " + one);
}
if (one.isLastPacketInBlock()) {
@ -1751,7 +1748,7 @@ class DataStreamer extends Daemon {
dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno();
if (LOG.isDebugEnabled()) {
LOG.debug("Queued packet " + packet.getSeqno());
LOG.debug("Queued " + packet + ", " + this);
}
dataQueue.notifyAll();
}
@ -1901,4 +1898,10 @@ class DataStreamer extends Daemon {
s.close();
}
}
@Override
public String toString() {
return (block == null? null: block.getLocalBlock())
+ "@" + Arrays.toString(getNodes());
}
}

View File

@ -18,8 +18,14 @@
package org.apache.hadoop.hdfs;
import java.util.List;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -31,15 +37,6 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
/****************************************************************************
* The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
* There are two kinds of StripedDataStreamer, leading streamer and ordinary
@ -49,40 +46,32 @@ import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
*
****************************************************************************/
public class StripedDataStreamer extends DataStreamer {
private final short index;
private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
private boolean hasCommittedBlock = false;
private final Coordinator coordinator;
private final int index;
private volatile boolean isFailed;
StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
StripedDataStreamer(HdfsFileStatus stat,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, short index,
List<BlockingQueue<LocatedBlock>> stripedBlocks,
String[] favoredNodes) {
super(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
ByteArrayManager byteArrayManage, String[] favoredNodes,
short index, Coordinator coordinator) {
super(stat, null, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, favoredNodes);
this.index = index;
this.stripedBlocks = stripedBlocks;
this.coordinator = coordinator;
}
/**
* Construct a data streamer for appending to the last partial block
* @param lastBlock last block of the file to be appended
* @param stat status of the file to be appended
* @throws IOException if error occurs
*/
StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, short index,
List<BlockingQueue<LocatedBlock>> stripedBlocks)
throws IOException {
super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage);
this.index = index;
this.stripedBlocks = stripedBlocks;
int getIndex() {
return index;
}
void setIsFailed(boolean isFailed) {
this.isFailed = isFailed;
}
boolean isFailed() {
return isFailed;
}
public boolean isLeadingStreamer () {
@ -95,18 +84,8 @@ public class StripedDataStreamer extends DataStreamer {
@Override
protected void endBlock() {
if (!isLeadingStreamer() && !isParityStreamer()) {
// before retrieving a new block, transfer the finished block to
// leading streamer
LocatedBlock finishedBlock = new LocatedBlock(
new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
block.getNumBytes(), block.getGenerationStamp()), null);
try {
boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
TimeUnit.SECONDS);
} catch (InterruptedException ie) {
// TODO: Handle InterruptedException (HDFS-7786)
}
if (!isParityStreamer()) {
coordinator.putEndBlock(index, block);
}
super.endBlock();
}
@ -114,71 +93,40 @@ public class StripedDataStreamer extends DataStreamer {
@Override
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
LocatedBlock lb = null;
if (isLeadingStreamer()) {
if (hasCommittedBlock) {
/**
* when committing a block group, leading streamer has to adjust
* {@link block} to include the size of block group
*/
for (int i = 1; i < NUM_DATA_BLOCKS; i++) {
try {
LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
TimeUnit.SECONDS);
if (finishedLocatedBlock == null) {
throw new IOException("Fail to get finished LocatedBlock " +
"from streamer, i=" + i);
}
ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
if (block != null) {
block.setNumBytes(block.getNumBytes() + bytes);
}
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when putting" +
" a block to stripeBlocks, ie = " + ie);
}
if (coordinator.shouldLocateFollowingBlock()) {
// set numByte for the previous block group
long bytes = 0;
for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
final ExtendedBlock b = coordinator.getEndBlock(i);
bytes += b == null ? 0 : b.getNumBytes();
}
block.setNumBytes(bytes);
}
lb = super.locateFollowingBlock(excludedNodes);
hasCommittedBlock = true;
assert lb instanceof LocatedStripedBlock;
DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock) lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS,
NUM_PARITY_BLOCKS);
final LocatedStripedBlock lsb
= (LocatedStripedBlock)super.locateFollowingBlock(excludedNodes);
if (LOG.isDebugEnabled()) {
LOG.debug("Obtained block group " + lsb);
}
LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(lsb,
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
"Fail to get block group from namenode: blockGroupSize: " +
(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
blocks.length;
lb = blocks[0];
for (int i = 1; i < blocks.length; i++) {
try {
boolean offSuccess = stripedBlocks.get(i).offer(blocks[i],
90, TimeUnit.SECONDS);
if(!offSuccess){
String msg = "Fail to put block to stripeBlocks. i = " + i;
DFSClient.LOG.info(msg);
throw new IOException(msg);
} else {
DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i
+ ", block: " + blocks[i]);
}
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when putting" +
" a block to stripeBlocks, ie = " + ie);
}
}
} else {
try {
// wait 90 seconds to get a block from the queue
lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
DFSClient.LOG.info("InterruptedException received when retrieving " +
"a block from stripeBlocks, ie = " + ie);
for (int i = 0; i < blocks.length; i++) {
coordinator.putStripedBlock(i, blocks[i]);
}
}
return lb;
return coordinator.getStripedBlock(index);
}
@Override
public String toString() {
return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0)
+ ", " + super.toString();
}
}

View File

@ -390,7 +390,7 @@ public class FSDirectory implements Closeable {
void disableQuotaChecks() {
skipQuotaCheck = true;
}
/**
* This is a wrapper for resolvePath(). If the path passed
* is prefixed with /.reserved/raw, then it checks to ensure that the caller

View File

@ -1998,8 +1998,6 @@ public class MiniDFSCluster {
int node = -1;
for (int i = 0; i < dataNodes.size(); i++) {
DataNode dn = dataNodes.get(i).datanode;
LOG.info("DN name=" + dnName + " found DN=" + dn +
" with name=" + dn.getDisplayName());
if (dnName.equals(dn.getDatanodeId().getXferAddr())) {
node = i;
break;

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -42,6 +44,12 @@ import org.junit.Test;
public class TestDFSStripedOutputStream {
public static final Log LOG = LogFactory.getLog(TestDFSStripedOutputStream.class);
static {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
}
private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
@ -245,6 +253,11 @@ public class TestDFSStripedOutputStream {
static void verifyParity(final long size, final int cellSize,
byte[][] dataBytes, byte[][] parityBytes) {
verifyParity(size, cellSize, dataBytes, parityBytes, -1);
}
static void verifyParity(final long size, final int cellSize,
byte[][] dataBytes, byte[][] parityBytes, int killedDnIndex) {
// verify the parity blocks
int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
size, cellSize, dataBytes.length, dataBytes.length);
@ -265,7 +278,10 @@ public class TestDFSStripedOutputStream {
encoder.initialize(dataBytes.length, parityBytes.length, cellSize);
encoder.encode(dataBytes, expectedParityBytes);
for (int i = 0; i < parityBytes.length; i++) {
Assert.assertArrayEquals(expectedParityBytes[i], parityBytes[i]);
if (i != killedDnIndex) {
Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex,
expectedParityBytes[i], parityBytes[i]);
}
}
}
}

View File

@ -0,0 +1,323 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestDFSStripedOutputStreamWithFailure {
public static final Log LOG = LogFactory.getLog(
TestDFSStripedOutputStreamWithFailure.class);
static {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
}
private static final int NUM_DATA_BLOCKS = HdfsConstants.NUM_DATA_BLOCKS;
private static final int NUM_PARITY_BLOCKS = HdfsConstants.NUM_PARITY_BLOCKS;
private static final int CELL_SIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private static final int STRIPES_PER_BLOCK = 4;
private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
private final HdfsConfiguration conf = new HdfsConfiguration();
private MiniDFSCluster cluster;
private DistributedFileSystem dfs;
private final Path dir = new Path("/"
+ TestDFSStripedOutputStreamWithFailure.class.getSimpleName());
@Before
public void setup() throws IOException {
final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
dfs.mkdirs(dir);
dfs.createErasureCodingZone(dir, null);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
private static byte getByte(long pos) {
return (byte)pos;
}
@Test(timeout=120000)
public void testDatanodeFailure1() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 1;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure2() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 2;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure3() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 3;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure4() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 4;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure5() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 5;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure6() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 6;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure7() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 7;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000)
public void testDatanodeFailure8() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 8;
runTest("file" + dn, length, dn);
}
private void runTest(final String src, final int length, final int dnIndex) {
try {
cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitActive();
runTest(new Path(dir, src), length, dnIndex);
} catch(Exception e) {
LOG.info("FAILED", e);
Assert.fail(StringUtils.stringifyException(e));
}
}
private void runTest(final Path p, final int length,
final int dnIndex) throws Exception {
LOG.info("p=" + p + ", length=" + length + ", dnIndex=" + dnIndex);
final String fullPath = p.toString();
final AtomicInteger pos = new AtomicInteger();
final FSDataOutputStream out = dfs.create(p);
final AtomicBoolean killed = new AtomicBoolean();
final Thread killer = new Thread(new Runnable() {
@Override
public void run() {
killDatanode(cluster, (DFSStripedOutputStream)out.getWrappedStream(),
dnIndex, pos);
killed.set(true);
}
});
killer.start();
final int mask = (1 << 16) - 1;
for(; pos.get() < length; ) {
final int i = pos.getAndIncrement();
write(out, i);
if ((i & mask) == 0) {
final long ms = 100;
LOG.info("i=" + i + " sleep " + ms);
Thread.sleep(ms);
}
}
killer.join(10000);
Assert.assertTrue(killed.get());
out.close();
// check file length
final FileStatus status = dfs.getFileStatus(p);
Assert.assertEquals(length, status.getLen());
checkData(dfs, fullPath, length, dnIndex);
}
static void write(FSDataOutputStream out, int i) throws IOException {
try {
out.write(getByte(i));
} catch(IOException ioe) {
throw new IOException("Failed at i=" + i, ioe);
}
}
static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
for(;;) {
final DatanodeInfo[] datanodes = streamer.getNodes();
if (datanodes != null) {
Assert.assertEquals(1, datanodes.length);
Assert.assertNotNull(datanodes[0]);
return datanodes[0];
}
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
return null;
}
}
}
static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
final int dnIndex, final AtomicInteger pos) {
final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
final DatanodeInfo datanode = getDatanodes(s);
LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
cluster.stopDataNode(datanode.getXferAddr());
}
static void checkData(DistributedFileSystem dfs, String src, int length,
int killedDnIndex) throws IOException {
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock) firstBlock,
CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
blockGroupList.add(Arrays.asList(blocks));
}
// test each block group
for (int group = 0; group < blockGroupList.size(); group++) {
final boolean isLastGroup = group == blockGroupList.size() - 1;
final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
: length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
final int numCellInGroup = (int)((groupSize - 1)/CELL_SIZE + 1);
final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
//get the data of this block
List<LocatedBlock> blockList = blockGroupList.get(group);
byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
// for each block, use BlockReader to read data
for (int i = 0; i < blockList.size(); i++) {
final int j = i >= NUM_DATA_BLOCKS? 0: i;
final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+ (j <= lastCellIndex? 1: 0);
final int blockSize = numCellInBlock*CELL_SIZE
+ (isLastGroup && i == lastCellIndex? lastCellSize - CELL_SIZE: 0);
final byte[] blockBytes = new byte[blockSize];
if (i < NUM_DATA_BLOCKS) {
dataBlockBytes[i] = blockBytes;
} else {
parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
}
final LocatedBlock lb = blockList.get(i);
LOG.info("XXX i=" + i + ", lb=" + lb);
if (lb == null) {
continue;
}
final ExtendedBlock block = lb.getBlock();
Assert.assertEquals(blockSize, block.getNumBytes());
if (block.getNumBytes() == 0) {
continue;
}
if (i != killedDnIndex) {
final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
dfs, lb, 0, block.getNumBytes());
blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
blockReader.close();
}
}
// check data
final int groupPosInFile = group*BLOCK_GROUP_SIZE;
for (int i = 0; i < dataBlockBytes.length; i++) {
final byte[] actual = dataBlockBytes[i];
for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
Assert.assertTrue(posInFile < length);
final byte expected = getByte(posInFile);
if (i == killedDnIndex) {
actual[posInBlk] = expected;
} else {
String s = "expected=" + expected + " but actual=" + actual[posInBlk]
+ ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
+ ". group=" + group + ", i=" + i;
Assert.assertEquals(s, expected, actual[posInBlk]);
}
}
}
// check parity
TestDFSStripedOutputStream.verifyParity(
lbs.getLocatedBlocks().get(group).getBlockSize(),
CELL_SIZE, dataBlockBytes, parityBlockBytes,
killedDnIndex - dataBlockBytes.length);
}
}
}