From 220ca960bce970d5969b9af570a3ce43360b7e2b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 5 May 2015 16:26:49 -0700 Subject: [PATCH] HDFS-7672. Handle write failure for stripping blocks and refactor the existing code in DFSStripedOutputStream and StripedDataStreamer. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../apache/hadoop/hdfs/DFSOutputStream.java | 73 +-- .../hadoop/hdfs/DFSStripedOutputStream.java | 505 +++++++++++------- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 11 +- .../org/apache/hadoop/hdfs/DataStreamer.java | 15 +- .../hadoop/hdfs/StripedDataStreamer.java | 156 ++---- .../hdfs/server/namenode/FSDirectory.java | 2 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 2 - .../hdfs/TestDFSStripedOutputStream.java | 18 +- ...TestDFSStripedOutputStreamWithFailure.java | 323 +++++++++++ 10 files changed, 769 insertions(+), 339 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index a8df3f268f9..7efaa5a21cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -172,3 +172,6 @@ HDFS-8324. Add trace info to DFSClient#getErasureCodingZoneInfo(..) (vinayakumarb via umamahesh) + + HDFS-7672. Handle write failure for stripping blocks and refactor the + existing code in DFSStripedOutputStream and StripedDataStreamer. (szetszwo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 0280d712a20..8580357d8bf 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -24,6 +24,8 @@ import java.nio.channels.ClosedChannelException; import java.util.EnumSet; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.crypto.CryptoProtocolVersion; @@ -86,6 +88,8 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { + static final Log LOG = LogFactory.getLog(DFSOutputStream.class); + /** * Number of times to retry creating a file when there are transient * errors (typically related to encryption zones and KeyProvider operations). @@ -419,32 +423,43 @@ public class DFSOutputStream extends FSOutputSummer streamer.incBytesCurBlock(len); // If packet is full, enqueue it for transmission - // if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || streamer.getBytesCurBlock() == blockSize) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + - currentPacket.getSeqno() + - ", src=" + src + - ", bytesCurBlock=" + streamer.getBytesCurBlock() + - ", blockSize=" + blockSize + - ", appendChunk=" + streamer.getAppendChunk()); - } - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; - - adjustChunkBoundary(); - - endBlock(); + enqueueCurrentPacketFull(); } } + void enqueueCurrentPacket() throws IOException { + streamer.waitAndQueuePacket(currentPacket); + currentPacket = null; + } + + void enqueueCurrentPacketFull() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("enqueue full " + currentPacket + ", src=" + src + + ", bytesCurBlock=" + streamer.getBytesCurBlock() + + ", blockSize=" + blockSize + + ", appendChunk=" + streamer.getAppendChunk() + + ", " + streamer); + } + enqueueCurrentPacket(); + adjustChunkBoundary(); + endBlock(); + } + + /** create an empty packet to mark the end of the block */ + void setCurrentPacket2Empty() throws InterruptedIOException { + currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), + streamer.getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock); + } + /** * If the reopened file did not end at chunk boundary and the above * write filled up its partial chunk. Tell the summer to generate full * crc chunks from now on. */ - protected void adjustChunkBoundary() { + private void adjustChunkBoundary() { if (streamer.getAppendChunk() && streamer.getBytesCurBlock() % bytesPerChecksum == 0) { streamer.setAppendChunk(false); @@ -466,11 +481,8 @@ public class DFSOutputStream extends FSOutputSummer */ protected void endBlock() throws IOException { if (streamer.getBytesCurBlock() == blockSize) { - currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), - streamer.getAndIncCurrentSeqno(), true); - currentPacket.setSyncBlock(shouldSyncBlock); - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; + setCurrentPacket2Empty(); + enqueueCurrentPacket(); streamer.setBytesCurBlock(0); lastFlushOffset = 0; } @@ -592,8 +604,7 @@ public class DFSOutputStream extends FSOutputSummer } if (currentPacket != null) { currentPacket.setSyncBlock(isSync); - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); } if (endBlock && streamer.getBytesCurBlock() > 0) { // Need to end the current block, thus send an empty packet to @@ -601,8 +612,7 @@ public class DFSOutputStream extends FSOutputSummer currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock || isSync); - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); streamer.setBytesCurBlock(0); lastFlushOffset = 0; } else { @@ -779,15 +789,11 @@ public class DFSOutputStream extends FSOutputSummer flushBuffer(); // flush from all upper layers if (currentPacket != null) { - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); } if (streamer.getBytesCurBlock() != 0) { - // send an empty packet to mark the end of the block - currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), - streamer.getAndIncCurrentSeqno(), true); - currentPacket.setSyncBlock(shouldSyncBlock); + setCurrentPacket2Empty(); } flushInternal(); // flush all data to Datanodes @@ -901,4 +907,9 @@ public class DFSOutputStream extends FSOutputSummer public long getFileId() { return fileId; } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + streamer; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 71cdbb9d59a..bbc8ba0beba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -28,14 +28,16 @@ import java.util.EnumSet; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.hdfs.protocol.ECInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.util.DataChecksum; @@ -44,6 +46,8 @@ import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import com.google.common.base.Preconditions; + /**************************************************************** * The DFSStripedOutputStream class supports writing files in striped @@ -55,117 +59,250 @@ import org.apache.htrace.TraceScope; @InterfaceAudience.Private public class DFSStripedOutputStream extends DFSOutputStream { + /** Coordinate the communication between the streamers. */ + static class Coordinator { + private final List> endBlocks; + private final List> stripedBlocks; + private volatile boolean shouldLocateFollowingBlock = false; + Coordinator(final int numDataBlocks, final int numAllBlocks) { + endBlocks = new ArrayList<>(numDataBlocks); + for (int i = 0; i < numDataBlocks; i++) { + endBlocks.add(new LinkedBlockingQueue(1)); + } + + stripedBlocks = new ArrayList<>(numAllBlocks); + for (int i = 0; i < numAllBlocks; i++) { + stripedBlocks.add(new LinkedBlockingQueue(1)); + } + } + + boolean shouldLocateFollowingBlock() { + return shouldLocateFollowingBlock; + } + + void putEndBlock(int i, ExtendedBlock block) { + shouldLocateFollowingBlock = true; + + final boolean b = endBlocks.get(i).offer(block); + Preconditions.checkState(b, "Failed to add " + block + + " to endBlocks queue, i=" + i); + } + + ExtendedBlock getEndBlock(int i) throws InterruptedIOException { + try { + return endBlocks.get(i).poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw DFSUtil.toInterruptedIOException( + "getEndBlock interrupted, i=" + i, e); + } + } + + void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) { + ExtendedBlock b = endBlocks.get(i).peek(); + if (b == null) { + // streamer just has failed, put end block and continue + b = block; + putEndBlock(i, b); + } + b.setNumBytes(newBytes); + } + + void putStripedBlock(int i, LocatedBlock block) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("putStripedBlock " + block + ", i=" + i); + } + final boolean b = stripedBlocks.get(i).offer(block); + if (!b) { + throw new IOException("Failed: " + block + ", i=" + i); + } + } + + LocatedBlock getStripedBlock(int i) throws IOException { + final LocatedBlock lb; + try { + lb = stripedBlocks.get(i).poll(90, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e); + } + + if (lb == null) { + throw new IOException("Failed: i=" + i); + } + return lb; + } + } + + /** Buffers for writing the data and parity cells of a strip. */ + class CellBuffers { + private final ByteBuffer[] buffers; + private final byte[][] checksumArrays; + + CellBuffers(int numParityBlocks) throws InterruptedException{ + if (cellSize % bytesPerChecksum != 0) { + throw new HadoopIllegalArgumentException("Invalid values: " + + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + + bytesPerChecksum + ") must divide cell size (=" + cellSize + ")."); + } + + checksumArrays = new byte[numParityBlocks][]; + final int size = getChecksumSize() * (cellSize / bytesPerChecksum); + for (int i = 0; i < checksumArrays.length; i++) { + checksumArrays[i] = new byte[size]; + } + + buffers = new ByteBuffer[numAllBlocks]; + for (int i = 0; i < buffers.length; i++) { + buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); + } + } + + private ByteBuffer[] getBuffers() { + return buffers; + } + + byte[] getChecksumArray(int i) { + return checksumArrays[i - numDataBlocks]; + } + + private int addTo(int i, byte[] b, int off, int len) { + final ByteBuffer buf = buffers[i]; + final int pos = buf.position() + len; + Preconditions.checkState(pos <= cellSize); + buf.put(b, off, len); + return pos; + } + + private void clear() { + for (int i = 0; i< numAllBlocks; i++) { + buffers[i].clear(); + if (i >= numDataBlocks) { + Arrays.fill(buffers[i].array(), (byte) 0); + } + } + } + + private void release() { + for (int i = 0; i < numAllBlocks; i++) { + byteArrayManager.release(buffers[i].array()); + } + } + + private void flipDataBuffers() { + for (int i = 0; i < numDataBlocks; i++) { + buffers[i].flip(); + } + } + } + + private final Coordinator coordinator; + private final CellBuffers cellBuffers; + private final RawErasureEncoder encoder; private final List streamers; - /** - * Size of each striping cell, must be a multiple of bytesPerChecksum - */ - private final ECInfo ecInfo; + + /** Size of each striping cell, must be a multiple of bytesPerChecksum */ private final int cellSize; - // checksum buffer, we only need to calculate checksum for parity blocks - private byte[] checksumBuf; - private ByteBuffer[] cellBuffers; - - private final short numAllBlocks; - private final short numDataBlocks; - - private int curIdx = 0; - /* bytes written in current block group */ - //private long currentBlockGroupBytes = 0; - - //TODO: Use ErasureCoder interface (HDFS-7781) - private RawErasureEncoder encoder; + private final int numAllBlocks; + private final int numDataBlocks; private StripedDataStreamer getLeadingStreamer() { return streamers.get(0); } - private long getBlockGroupSize() { - return blockSize * numDataBlocks; - } - /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); - DFSClient.LOG.info("Creating striped output stream"); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating DFSStripedOutputStream for " + src); + } // ECInfo is restored from NN just before writing striped files. - ecInfo = dfsClient.getErasureCodingInfo(src); - cellSize = ecInfo.getSchema().getChunkSize(); - numAllBlocks = (short)(ecInfo.getSchema().getNumDataUnits() - + ecInfo.getSchema().getNumParityUnits()); - numDataBlocks = (short)ecInfo.getSchema().getNumDataUnits(); + //TODO reduce an rpc call HDFS-8289 + final ECSchema schema = dfsClient.getErasureCodingInfo(src).getSchema(); + final int numParityBlocks = schema.getNumParityUnits(); + cellSize = schema.getChunkSize(); + numDataBlocks = schema.getNumDataUnits(); + numAllBlocks = numDataBlocks + numParityBlocks; - checkConfiguration(); - - checksumBuf = new byte[getChecksumSize() * (cellSize / bytesPerChecksum)]; - cellBuffers = new ByteBuffer[numAllBlocks]; - List> stripeBlocks = new ArrayList<>(); - - for (int i = 0; i < numAllBlocks; i++) { - stripeBlocks.add(new LinkedBlockingQueue(numAllBlocks)); - try { - cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize)); - } catch (InterruptedException ie) { - final InterruptedIOException iioe = new InterruptedIOException( - "create cell buffers"); - iioe.initCause(ie); - throw iioe; - } - } encoder = new RSRawEncoder(); - encoder.initialize(numDataBlocks, - numAllBlocks - numDataBlocks, cellSize); + encoder.initialize(numDataBlocks, numParityBlocks, cellSize); + + coordinator = new Coordinator(numDataBlocks, numAllBlocks); + try { + cellBuffers = new CellBuffers(numParityBlocks); + } catch (InterruptedException ie) { + throw DFSUtil.toInterruptedIOException("Failed to create cell buffers", ie); + } List s = new ArrayList<>(numAllBlocks); for (short i = 0; i < numAllBlocks; i++) { - StripedDataStreamer streamer = new StripedDataStreamer(stat, null, + StripedDataStreamer streamer = new StripedDataStreamer(stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, - i, stripeBlocks, favoredNodes); + favoredNodes, i, coordinator); s.add(streamer); } streamers = Collections.unmodifiableList(s); - - refreshStreamer(); + setCurrentStreamer(0); } - private void checkConfiguration() { - if (cellSize % bytesPerChecksum != 0) { - throw new HadoopIllegalArgumentException("Invalid values: " - + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum - + ") must divide cell size (=" + cellSize + ")."); - } + StripedDataStreamer getStripedDataStreamer(int i) { + return streamers.get(i); } - private void refreshStreamer() { - streamer = streamers.get(curIdx); + int getCurrentIndex() { + return getCurrentStreamer().getIndex(); } - private void moveToNextStreamer() { - curIdx = (curIdx + 1) % numAllBlocks; - refreshStreamer(); + StripedDataStreamer getCurrentStreamer() { + return (StripedDataStreamer)streamer; + } + + private StripedDataStreamer setCurrentStreamer(int i) { + streamer = streamers.get(i); + return getCurrentStreamer(); } /** - * encode the buffers. - * After encoding, flip each buffer. + * Encode the buffers, i.e. compute parities. * * @param buffers data buffers + parity buffers */ - private void encode(ByteBuffer[] buffers) { - ByteBuffer[] dataBuffers = new ByteBuffer[numDataBlocks]; - ByteBuffer[] parityBuffers = new ByteBuffer[numAllBlocks - numDataBlocks]; - for (int i = 0; i < numAllBlocks; i++) { - if (i < numDataBlocks) { - dataBuffers[i] = buffers[i]; - } else { - parityBuffers[i - numDataBlocks] = buffers[i]; + private static void encode(RawErasureEncoder encoder, int numData, + ByteBuffer[] buffers) { + final ByteBuffer[] dataBuffers = new ByteBuffer[numData]; + final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData]; + System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length); + System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length); + + encoder.encode(dataBuffers, parityBuffers); + } + + + private void checkStreamers() throws IOException { + int count = 0; + for(StripedDataStreamer s : streamers) { + if (!s.isFailed()) { + count++; } } - encoder.encode(dataBuffers, parityBuffers); + if (LOG.isDebugEnabled()) { + LOG.debug("checkStreamers: " + streamers); + LOG.debug("count=" + count); + } + if (count < numDataBlocks) { + throw new IOException("Failed: the number of remaining blocks = " + + count + " < the number of data blocks = " + numDataBlocks); + } + } + + private void handleStreamerFailure(String err, Exception e) throws IOException { + LOG.warn("Failed: " + err + ", " + this, e); + getCurrentStreamer().setIsFailed(true); + checkStreamers(); + currentPacket = null; } /** @@ -173,11 +310,12 @@ public class DFSStripedOutputStream extends DFSOutputStream { * writing parity blocks. * * @param byteBuffer the given buffer to generate packets + * @param checksumBuf the checksum buffer * @return packets generated * @throws IOException */ - private List generatePackets(ByteBuffer byteBuffer) - throws IOException{ + private List generatePackets( + ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{ List packets = new ArrayList<>(); assert byteBuffer.hasArray(); getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0, @@ -201,82 +339,47 @@ public class DFSStripedOutputStream extends DFSOutputStream { } @Override - protected synchronized void writeChunk(byte[] b, int offset, int len, + protected synchronized void writeChunk(byte[] bytes, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { - super.writeChunk(b, offset, len, checksum, ckoff, cklen); + final int index = getCurrentIndex(); + final StripedDataStreamer current = getCurrentStreamer(); + final int pos = cellBuffers.addTo(index, bytes, offset, len); + final boolean cellFull = pos == cellSize; - if (getSizeOfCellnBuffer(curIdx) <= cellSize) { - addToCellBuffer(b, offset, len); - } else { - String msg = "Writing a chunk should not overflow the cell buffer."; - DFSClient.LOG.info(msg); - throw new IOException(msg); + final long oldBytes = current.getBytesCurBlock(); + if (!current.isFailed()) { + try { + super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); + + // cell is full and current packet has not been enqueued, + if (cellFull && currentPacket != null) { + enqueueCurrentPacketFull(); + } + } catch(Exception e) { + handleStreamerFailure("offset=" + offset + ", length=" + len, e); + } } - // If current packet has not been enqueued for transmission, - // but the cell buffer is full, we need to enqueue the packet - if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" + - currentPacket.getSeqno() + - ", curIdx=" + curIdx + - ", src=" + src + - ", bytesCurBlock=" + streamer.getBytesCurBlock() + - ", blockSize=" + blockSize + - ", appendChunk=" + streamer.getAppendChunk()); - } - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; - adjustChunkBoundary(); - endBlock(); + if (current.isFailed()) { + final long newBytes = oldBytes + len; + coordinator.setBytesEndBlock(index, newBytes, current.getBlock()); + current.setBytesCurBlock(newBytes); } // Two extra steps are needed when a striping cell is full: // 1. Forward the current index pointer // 2. Generate parity packets if a full stripe of data cells are present - if (getSizeOfCellnBuffer(curIdx) == cellSize) { - //move curIdx to next cell - moveToNextStreamer(); + if (cellFull) { + int next = index + 1; //When all data cells in a stripe are ready, we need to encode //them and generate some parity cells. These cells will be //converted to packets and put to their DataStreamer's queue. - if (curIdx == numDataBlocks) { - //encode the data cells - for (int k = 0; k < numDataBlocks; k++) { - cellBuffers[k].flip(); - } - encode(cellBuffers); - for (int i = numDataBlocks; i < numAllBlocks; i++) { - ByteBuffer parityBuffer = cellBuffers[i]; - List packets = generatePackets(parityBuffer); - for (DFSPacket p : packets) { - currentPacket = p; - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; - } - endBlock(); - moveToNextStreamer(); - } - //read next stripe to cellBuffers - clearCellBuffers(); - } - } - } - - private void addToCellBuffer(byte[] b, int off, int len) { - cellBuffers[curIdx].put(b, off, len); - } - - private int getSizeOfCellnBuffer(int cellIndex) { - return cellBuffers[cellIndex].position(); - } - - private void clearCellBuffers() { - for (int i = 0; i< numAllBlocks; i++) { - cellBuffers[i].clear(); - if (i >= numDataBlocks) { - Arrays.fill(cellBuffers[i].array(), (byte) 0); + if (next == numDataBlocks) { + cellBuffers.flipDataBuffers(); + writeParityCells(); + next = 0; } + setCurrentStreamer(next); } } @@ -284,20 +387,14 @@ public class DFSStripedOutputStream extends DFSOutputStream { return numDataBlocks * cellSize; } - private void notSupported(String headMsg) - throws IOException{ - throw new IOException( - headMsg + " is now not supported for striping layout."); + @Override + public void hflush() { + throw new UnsupportedOperationException(); } @Override - public void hflush() throws IOException { - notSupported("hflush"); - } - - @Override - public void hsync() throws IOException { - notSupported("hsync"); + public void hsync() { + throw new UnsupportedOperationException(); } @Override @@ -327,29 +424,28 @@ public class DFSStripedOutputStream extends DFSOutputStream { return closed || getLeadingStreamer().streamerClosed(); } - // shutdown datastreamer and responseprocessor threads. - // interrupt datastreamer if force is true @Override protected void closeThreads(boolean force) throws IOException { - int index = 0; - boolean exceptionOccurred = false; + final MultipleIOException.Builder b = new MultipleIOException.Builder(); for (StripedDataStreamer streamer : streamers) { try { streamer.close(force); streamer.join(); streamer.closeSocket(); - } catch (InterruptedException | IOException e) { - DFSClient.LOG.error("Failed to shutdown streamer: name=" - + streamer.getName() + ", index=" + index + ", file=" + src, e); - exceptionOccurred = true; + } catch(Exception e) { + try { + handleStreamerFailure("force=" + force, e); + } catch(IOException ioe) { + b.add(ioe); + } } finally { streamer.setSocketToNull(); setClosed(); - index++; } } - if (exceptionOccurred) { - throw new IOException("Failed to shutdown streamer"); + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; } } @@ -370,50 +466,69 @@ public class DFSStripedOutputStream extends DFSOutputStream { if (currentBlockGroupBytes % stripeDataSize() == 0) { return; } - long firstCellSize = getLeadingStreamer().getBytesCurBlock() % cellSize; - long parityCellSize = firstCellSize > 0 && firstCellSize < cellSize ? + + final int firstCellSize = (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize); + final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize? firstCellSize : cellSize; + final ByteBuffer[] buffers = cellBuffers.getBuffers(); for (int i = 0; i < numAllBlocks; i++) { // Pad zero bytes to make all cells exactly the size of parityCellSize // If internal block is smaller than parity block, pad zero bytes. // Also pad zero bytes to all parity cells - int position = cellBuffers[i].position(); + final int position = buffers[i].position(); assert position <= parityCellSize : "If an internal block is smaller" + " than parity block, then its last cell should be small than last" + " parity cell"; for (int j = 0; j < parityCellSize - position; j++) { - cellBuffers[i].put((byte) 0); + buffers[i].put((byte) 0); } - cellBuffers[i].flip(); + buffers[i].flip(); } - encode(cellBuffers); - // write parity cells - curIdx = numDataBlocks; - refreshStreamer(); + writeParityCells(); + } + + void writeParityCells() throws IOException { + final ByteBuffer[] buffers = cellBuffers.getBuffers(); + //encode the data cells + encode(encoder, numDataBlocks, buffers); for (int i = numDataBlocks; i < numAllBlocks; i++) { - ByteBuffer parityBuffer = cellBuffers[i]; - List packets = generatePackets(parityBuffer); - for (DFSPacket p : packets) { - currentPacket = p; - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; + writeParity(i, buffers[i], cellBuffers.getChecksumArray(i)); + } + cellBuffers.clear(); + } + + void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf + ) throws IOException { + final StripedDataStreamer current = setCurrentStreamer(index); + final int len = buffer.limit(); + + final long oldBytes = current.getBytesCurBlock(); + if (!current.isFailed()) { + try { + for (DFSPacket p : generatePackets(buffer, checksumBuf)) { + streamer.waitAndQueuePacket(p); + } + endBlock(); + } catch(Exception e) { + handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); } - endBlock(); - moveToNextStreamer(); } - clearCellBuffers(); + if (current.isFailed()) { + final long newBytes = oldBytes + len; + current.setBytesCurBlock(newBytes); + } } @Override void setClosed() { super.setClosed(); for (int i = 0; i < numAllBlocks; i++) { - byteArrayManager.release(cellBuffers[i].array()); streamers.get(i).release(); } + cellBuffers.release(); } @Override @@ -425,25 +540,31 @@ public class DFSStripedOutputStream extends DFSOutputStream { try { // flush from all upper layers - flushBuffer(); - if (currentPacket != null) { - streamer.waitAndQueuePacket(currentPacket); - currentPacket = null; + try { + flushBuffer(); + if (currentPacket != null) { + enqueueCurrentPacket(); + } + } catch(Exception e) { + handleStreamerFailure("closeImpl", e); } + // if the last stripe is incomplete, generate and write parity cells writeParityCellsForLastStripe(); for (int i = 0; i < numAllBlocks; i++) { - curIdx = i; - refreshStreamer(); - if (streamer.getBytesCurBlock() > 0) { - // send an empty packet to mark the end of the block - currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), - streamer.getAndIncCurrentSeqno(), true); - currentPacket.setSyncBlock(shouldSyncBlock); + final StripedDataStreamer s = setCurrentStreamer(i); + if (!s.isFailed()) { + try { + if (s.getBytesCurBlock() > 0) { + setCurrentPacket2Empty(); + } + // flush all data to Datanode + flushInternal(); + } catch(Exception e) { + handleStreamerFailure("closeImpl", e); + } } - // flush all data to Datanode - flushInternal(); } closeThreads(false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index cae56c04da5..2e2ecfd652a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PAS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.PrintStream; import java.io.UnsupportedEncodingException; import java.net.InetAddress; @@ -55,7 +56,6 @@ import java.util.concurrent.ThreadLocalRandom; import javax.net.SocketFactory; -import com.google.common.collect.Sets; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Option; @@ -96,6 +96,7 @@ import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.protobuf.BlockingService; @InterfaceAudience.Private @@ -1513,7 +1514,7 @@ public class DFSUtil { public static int getSmallBufferSize(Configuration conf) { return Math.min(getIoFileBufferSize(conf) / 2, 512); } - + /** * Probe for HDFS Encryption being enabled; this uses the value of * the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI}, @@ -1527,4 +1528,10 @@ public class DFSUtil { DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty(); } + public static InterruptedIOException toInterruptedIOException(String message, + InterruptedException e) { + final InterruptedIOException iioe = new InterruptedIOException(message); + iioe.initCause(e); + return iioe; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 631f3869a29..8f07341f749 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -575,7 +575,7 @@ class DataStreamer extends Daemon { // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if(LOG.isDebugEnabled()) { - LOG.debug("Allocating new block"); + LOG.debug("Allocating new block " + this); } setPipeline(nextBlockOutputStream()); initDataStreaming(); @@ -593,10 +593,7 @@ class DataStreamer extends Daemon { long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > stat.getBlockSize()) { throw new IOException("BlockSize " + stat.getBlockSize() + - " is smaller than data size. " + - " Offset of packet in block " + - lastByteOffsetInBlock + - " Aborting file " + src); + " < lastByteOffsetInBlock, " + this + ", " + one); } if (one.isLastPacketInBlock()) { @@ -1751,7 +1748,7 @@ class DataStreamer extends Daemon { dataQueue.addLast(packet); lastQueuedSeqno = packet.getSeqno(); if (LOG.isDebugEnabled()) { - LOG.debug("Queued packet " + packet.getSeqno()); + LOG.debug("Queued " + packet + ", " + this); } dataQueue.notifyAll(); } @@ -1901,4 +1898,10 @@ class DataStreamer extends Daemon { s.close(); } } + + @Override + public String toString() { + return (block == null? null: block.getLocalBlock()) + + "@" + Arrays.toString(getNodes()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index ef7e2a687a6..258fc6505dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -18,8 +18,14 @@ package org.apache.hadoop.hdfs; -import java.util.List; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -31,15 +37,6 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; - /**************************************************************************** * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}. * There are two kinds of StripedDataStreamer, leading streamer and ordinary @@ -49,40 +46,32 @@ import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; * ****************************************************************************/ public class StripedDataStreamer extends DataStreamer { - private final short index; - private final List> stripedBlocks; - private boolean hasCommittedBlock = false; + private final Coordinator coordinator; + private final int index; + private volatile boolean isFailed; - StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block, + StripedDataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, - ByteArrayManager byteArrayManage, short index, - List> stripedBlocks, - String[] favoredNodes) { - super(stat, block, dfsClient, src, progress, checksum, cachingStrategy, + ByteArrayManager byteArrayManage, String[] favoredNodes, + short index, Coordinator coordinator) { + super(stat, null, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, favoredNodes); this.index = index; - this.stripedBlocks = stripedBlocks; + this.coordinator = coordinator; } - /** - * Construct a data streamer for appending to the last partial block - * @param lastBlock last block of the file to be appended - * @param stat status of the file to be appended - * @throws IOException if error occurs - */ - StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, - DFSClient dfsClient, String src, - Progressable progress, DataChecksum checksum, - AtomicReference cachingStrategy, - ByteArrayManager byteArrayManage, short index, - List> stripedBlocks) - throws IOException { - super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage); - this.index = index; - this.stripedBlocks = stripedBlocks; + int getIndex() { + return index; + } + + void setIsFailed(boolean isFailed) { + this.isFailed = isFailed; + } + + boolean isFailed() { + return isFailed; } public boolean isLeadingStreamer () { @@ -95,18 +84,8 @@ public class StripedDataStreamer extends DataStreamer { @Override protected void endBlock() { - if (!isLeadingStreamer() && !isParityStreamer()) { - // before retrieving a new block, transfer the finished block to - // leading streamer - LocatedBlock finishedBlock = new LocatedBlock( - new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(), - block.getNumBytes(), block.getGenerationStamp()), null); - try { - boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30, - TimeUnit.SECONDS); - } catch (InterruptedException ie) { - // TODO: Handle InterruptedException (HDFS-7786) - } + if (!isParityStreamer()) { + coordinator.putEndBlock(index, block); } super.endBlock(); } @@ -114,71 +93,40 @@ public class StripedDataStreamer extends DataStreamer { @Override protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { - LocatedBlock lb = null; if (isLeadingStreamer()) { - if (hasCommittedBlock) { - /** - * when committing a block group, leading streamer has to adjust - * {@link block} to include the size of block group - */ - for (int i = 1; i < NUM_DATA_BLOCKS; i++) { - try { - LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30, - TimeUnit.SECONDS); - if (finishedLocatedBlock == null) { - throw new IOException("Fail to get finished LocatedBlock " + - "from streamer, i=" + i); - } - ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock(); - long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes(); - if (block != null) { - block.setNumBytes(block.getNumBytes() + bytes); - } - } catch (InterruptedException ie) { - DFSClient.LOG.info("InterruptedException received when putting" + - " a block to stripeBlocks, ie = " + ie); - } + if (coordinator.shouldLocateFollowingBlock()) { + // set numByte for the previous block group + long bytes = 0; + for (int i = 0; i < NUM_DATA_BLOCKS; i++) { + final ExtendedBlock b = coordinator.getEndBlock(i); + bytes += b == null ? 0 : b.getNumBytes(); } + block.setNumBytes(bytes); } - lb = super.locateFollowingBlock(excludedNodes); - hasCommittedBlock = true; - assert lb instanceof LocatedStripedBlock; - DFSClient.LOG.debug("Leading streamer obtained bg " + lb); - LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( - (LocatedStripedBlock) lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, - NUM_PARITY_BLOCKS); + final LocatedStripedBlock lsb + = (LocatedStripedBlock)super.locateFollowingBlock(excludedNodes); + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained block group " + lsb); + } + LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(lsb, + BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) : "Fail to get block group from namenode: blockGroupSize: " + (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " + blocks.length; - lb = blocks[0]; - for (int i = 1; i < blocks.length; i++) { - try { - boolean offSuccess = stripedBlocks.get(i).offer(blocks[i], - 90, TimeUnit.SECONDS); - if(!offSuccess){ - String msg = "Fail to put block to stripeBlocks. i = " + i; - DFSClient.LOG.info(msg); - throw new IOException(msg); - } else { - DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i - + ", block: " + blocks[i]); - } - } catch (InterruptedException ie) { - DFSClient.LOG.info("InterruptedException received when putting" + - " a block to stripeBlocks, ie = " + ie); - } - } - } else { - try { - // wait 90 seconds to get a block from the queue - lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - DFSClient.LOG.info("InterruptedException received when retrieving " + - "a block from stripeBlocks, ie = " + ie); + for (int i = 0; i < blocks.length; i++) { + coordinator.putStripedBlock(i, blocks[i]); } } - return lb; + + return coordinator.getStripedBlock(index); + } + + @Override + public String toString() { + return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0) + + ", " + super.toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 7392552133f..8f843d5cbb9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -390,7 +390,7 @@ public class FSDirectory implements Closeable { void disableQuotaChecks() { skipQuotaCheck = true; } - + /** * This is a wrapper for resolvePath(). If the path passed * is prefixed with /.reserved/raw, then it checks to ensure that the caller diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index fdbacdca40f..4ec9bf96273 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1998,8 +1998,6 @@ public class MiniDFSCluster { int node = -1; for (int i = 0; i < dataNodes.size(); i++) { DataNode dn = dataNodes.get(i).datanode; - LOG.info("DN name=" + dnName + " found DN=" + dn + - " with name=" + dn.getDisplayName()); if (dnName.equals(dn.getDatanodeId().getXferAddr())) { node = i; break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index 5ce94ee1a41..ec98e68612f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -42,6 +44,12 @@ import org.junit.Test; public class TestDFSStripedOutputStream { public static final Log LOG = LogFactory.getLog(TestDFSStripedOutputStream.class); + + static { + GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); + } + private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; @@ -245,6 +253,11 @@ public class TestDFSStripedOutputStream { static void verifyParity(final long size, final int cellSize, byte[][] dataBytes, byte[][] parityBytes) { + verifyParity(size, cellSize, dataBytes, parityBytes, -1); + } + + static void verifyParity(final long size, final int cellSize, + byte[][] dataBytes, byte[][] parityBytes, int killedDnIndex) { // verify the parity blocks int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength( size, cellSize, dataBytes.length, dataBytes.length); @@ -265,7 +278,10 @@ public class TestDFSStripedOutputStream { encoder.initialize(dataBytes.length, parityBytes.length, cellSize); encoder.encode(dataBytes, expectedParityBytes); for (int i = 0; i < parityBytes.length; i++) { - Assert.assertArrayEquals(expectedParityBytes[i], parityBytes[i]); + if (i != killedDnIndex) { + Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex, + expectedParityBytes[i], parityBytes[i]); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java new file mode 100644 index 00000000000..c2e588ad501 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestDFSStripedOutputStreamWithFailure { + public static final Log LOG = LogFactory.getLog( + TestDFSStripedOutputStreamWithFailure.class); + static { + GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); + } + + private static final int NUM_DATA_BLOCKS = HdfsConstants.NUM_DATA_BLOCKS; + private static final int NUM_PARITY_BLOCKS = HdfsConstants.NUM_PARITY_BLOCKS; + private static final int CELL_SIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private static final int STRIPES_PER_BLOCK = 4; + private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK; + private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS; + + private final HdfsConfiguration conf = new HdfsConfiguration(); + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + private final Path dir = new Path("/" + + TestDFSStripedOutputStreamWithFailure.class.getSimpleName()); + + + @Before + public void setup() throws IOException { + final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + dfs.mkdirs(dir); + dfs.createErasureCodingZone(dir, null); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private static byte getByte(long pos) { + return (byte)pos; + } + + @Test(timeout=120000) + public void testDatanodeFailure1() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 1; + runTest("file" + dn, length, dn); + } + + @Test(timeout=120000) + public void testDatanodeFailure2() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 2; + runTest("file" + dn, length, dn); + } + + @Test(timeout=120000) + public void testDatanodeFailure3() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 3; + runTest("file" + dn, length, dn); + } + + @Test(timeout=120000) + public void testDatanodeFailure4() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 4; + runTest("file" + dn, length, dn); + } + + @Test(timeout=120000) + public void testDatanodeFailure5() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 5; + runTest("file" + dn, length, dn); + } + + @Test(timeout=120000) + public void testDatanodeFailure6() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 6; + runTest("file" + dn, length, dn); + } + + @Test(timeout=120000) + public void testDatanodeFailure7() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 7; + runTest("file" + dn, length, dn); + } + + @Test(timeout=120000) + public void testDatanodeFailure8() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 8; + runTest("file" + dn, length, dn); + } + + private void runTest(final String src, final int length, final int dnIndex) { + try { + cluster.startDataNodes(conf, 1, true, null, null); + cluster.waitActive(); + + runTest(new Path(dir, src), length, dnIndex); + } catch(Exception e) { + LOG.info("FAILED", e); + Assert.fail(StringUtils.stringifyException(e)); + } + } + + private void runTest(final Path p, final int length, + final int dnIndex) throws Exception { + LOG.info("p=" + p + ", length=" + length + ", dnIndex=" + dnIndex); + final String fullPath = p.toString(); + + final AtomicInteger pos = new AtomicInteger(); + final FSDataOutputStream out = dfs.create(p); + final AtomicBoolean killed = new AtomicBoolean(); + final Thread killer = new Thread(new Runnable() { + @Override + public void run() { + killDatanode(cluster, (DFSStripedOutputStream)out.getWrappedStream(), + dnIndex, pos); + killed.set(true); + } + }); + killer.start(); + + final int mask = (1 << 16) - 1; + for(; pos.get() < length; ) { + final int i = pos.getAndIncrement(); + write(out, i); + if ((i & mask) == 0) { + final long ms = 100; + LOG.info("i=" + i + " sleep " + ms); + Thread.sleep(ms); + } + } + killer.join(10000); + Assert.assertTrue(killed.get()); + out.close(); + + // check file length + final FileStatus status = dfs.getFileStatus(p); + Assert.assertEquals(length, status.getLen()); + + checkData(dfs, fullPath, length, dnIndex); + } + + static void write(FSDataOutputStream out, int i) throws IOException { + try { + out.write(getByte(i)); + } catch(IOException ioe) { + throw new IOException("Failed at i=" + i, ioe); + } + } + + static DatanodeInfo getDatanodes(StripedDataStreamer streamer) { + for(;;) { + final DatanodeInfo[] datanodes = streamer.getNodes(); + if (datanodes != null) { + Assert.assertEquals(1, datanodes.length); + Assert.assertNotNull(datanodes[0]); + return datanodes[0]; + } + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + return null; + } + } + } + + static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out, + final int dnIndex, final AtomicInteger pos) { + final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex); + final DatanodeInfo datanode = getDatanodes(s); + LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos); + cluster.stopDataNode(datanode.getXferAddr()); + } + + static void checkData(DistributedFileSystem dfs, String src, int length, + int killedDnIndex) throws IOException { + List> blockGroupList = new ArrayList<>(); + LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L); + final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1; + Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size()); + + for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { + Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); + LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock) firstBlock, + CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + blockGroupList.add(Arrays.asList(blocks)); + } + + // test each block group + for (int group = 0; group < blockGroupList.size(); group++) { + final boolean isLastGroup = group == blockGroupList.size() - 1; + final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE + : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE; + final int numCellInGroup = (int)((groupSize - 1)/CELL_SIZE + 1); + final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS; + final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE; + + //get the data of this block + List blockList = blockGroupList.get(group); + byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][]; + byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][]; + + // for each block, use BlockReader to read data + for (int i = 0; i < blockList.size(); i++) { + final int j = i >= NUM_DATA_BLOCKS? 0: i; + final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS + + (j <= lastCellIndex? 1: 0); + final int blockSize = numCellInBlock*CELL_SIZE + + (isLastGroup && i == lastCellIndex? lastCellSize - CELL_SIZE: 0); + + final byte[] blockBytes = new byte[blockSize]; + if (i < NUM_DATA_BLOCKS) { + dataBlockBytes[i] = blockBytes; + } else { + parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes; + } + + final LocatedBlock lb = blockList.get(i); + LOG.info("XXX i=" + i + ", lb=" + lb); + if (lb == null) { + continue; + } + final ExtendedBlock block = lb.getBlock(); + Assert.assertEquals(blockSize, block.getNumBytes()); + + + if (block.getNumBytes() == 0) { + continue; + } + + if (i != killedDnIndex) { + final BlockReader blockReader = BlockReaderTestUtil.getBlockReader( + dfs, lb, 0, block.getNumBytes()); + blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); + blockReader.close(); + } + } + + // check data + final int groupPosInFile = group*BLOCK_GROUP_SIZE; + for (int i = 0; i < dataBlockBytes.length; i++) { + final byte[] actual = dataBlockBytes[i]; + for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) { + final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG( + CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile; + Assert.assertTrue(posInFile < length); + final byte expected = getByte(posInFile); + + if (i == killedDnIndex) { + actual[posInBlk] = expected; + } else { + String s = "expected=" + expected + " but actual=" + actual[posInBlk] + + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk + + ". group=" + group + ", i=" + i; + Assert.assertEquals(s, expected, actual[posInBlk]); + } + } + } + + // check parity + TestDFSStripedOutputStream.verifyParity( + lbs.getLocatedBlocks().get(group).getBlockSize(), + CELL_SIZE, dataBlockBytes, parityBlockBytes, + killedDnIndex - dataBlockBytes.length); + } + } +}