diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 9373e98bf42..6006d71faf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -189,16 +189,6 @@ public interface HdfsClientConfigKeys { int THREADPOOL_SIZE_DEFAULT = 18; } - /** dfs.client.write.striped configuration properties */ - interface StripedWrite { - String PREFIX = Write.PREFIX + "striped."; - - String MAX_SECONDS_GET_STRIPED_BLOCK_KEY = PREFIX + "max-seconds-get-striped-block"; - int MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT = 90; - String MAX_SECONDS_GET_ENDED_BLOCK_KEY = PREFIX + "max-seconds-get-ended-block"; - int MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT = 60; - } - /** dfs.http.client configuration properties */ interface HttpClient { String PREFIX = "dfs.http.client."; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index f41d30a678e..a710c2efa81 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -307,4 +307,7 @@ StripedBlocksFeature. (Walter Su via jing9) HDFS-8466. Refactor BlockInfoContiguous and fix NPE in - TestBlockInfo#testCopyConstructor() (vinayakumarb) \ No newline at end of file + TestBlockInfo#testCopyConstructor() (vinayakumarb) + + HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to + tolerate datanode failure. (Tsz Wo Nicholas Sze via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index bdd33521793..1068b3752ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -28,7 +28,6 @@ import java.util.EnumSet; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; @@ -40,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; @@ -51,27 +49,33 @@ import org.apache.htrace.TraceScope; import com.google.common.base.Preconditions; -/**************************************************************** - * The DFSStripedOutputStream class supports writing files in striped - * layout. Each stripe contains a sequence of cells and multiple - * {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible - * for writing the cells to different datanodes. - * - ****************************************************************/ - +/** + * This class supports writing files in striped layout and erasure coded format. + * Each stripe contains a sequence of cells. + */ @InterfaceAudience.Private public class DFSStripedOutputStream extends DFSOutputStream { static class MultipleBlockingQueue { - private final int pullTimeout; private final List> queues; - MultipleBlockingQueue(int numQueue, int queueSize, int pullTimeout) { + MultipleBlockingQueue(int numQueue, int queueSize) { queues = new ArrayList<>(numQueue); for (int i = 0; i < numQueue; i++) { queues.add(new LinkedBlockingQueue(queueSize)); } + } - this.pullTimeout = pullTimeout; + boolean isEmpty() { + for(int i = 0; i < queues.size(); i++) { + if (!queues.get(i).isEmpty()) { + return false; + } + } + return true; + } + + int numQueues() { + return queues.size(); } void offer(int i, T object) { @@ -80,49 +84,71 @@ public class DFSStripedOutputStream extends DFSOutputStream { + " to queue, i=" + i); } - T poll(int i) throws InterruptedIOException { + T take(int i) throws InterruptedIOException { try { - return queues.get(i).poll(pullTimeout, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw DFSUtil.toInterruptedIOException("poll interrupted, i=" + i, e); + return queues.get(i).take(); + } catch(InterruptedException ie) { + throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie); } } + T poll(int i) { + return queues.get(i).poll(); + } + T peek(int i) { return queues.get(i).peek(); } } /** Coordinate the communication between the streamers. */ - static class Coordinator { - private final MultipleBlockingQueue stripedBlocks; + class Coordinator { + private final MultipleBlockingQueue followingBlocks; private final MultipleBlockingQueue endBlocks; + + private final MultipleBlockingQueue newBlocks; private final MultipleBlockingQueue updateBlocks; Coordinator(final DfsClientConf conf, final int numDataBlocks, final int numAllBlocks) { - stripedBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1, - conf.getStripedWriteMaxSecondsGetStripedBlock()); - endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1, - conf.getStripedWriteMaxSecondsGetEndedBlock()); - updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1, - conf.getStripedWriteMaxSecondsGetStripedBlock()); + followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1); + + newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); } - void putEndBlock(int i, ExtendedBlock block) { + MultipleBlockingQueue getFollowingBlocks() { + return followingBlocks; + } + + MultipleBlockingQueue getNewBlocks() { + return newBlocks; + } + + MultipleBlockingQueue getUpdateBlocks() { + return updateBlocks; + } + + StripedDataStreamer getStripedDataStreamer(int i) { + return DFSStripedOutputStream.this.getStripedDataStreamer(i); + } + + void offerEndBlock(int i, ExtendedBlock block) { endBlocks.offer(i, block); } - ExtendedBlock getEndBlock(int i) throws InterruptedIOException { - return endBlocks.poll(i); + ExtendedBlock takeEndBlock(int i) throws InterruptedIOException { + return endBlocks.take(i); } - void putUpdateBlock(int i, ExtendedBlock block) { - updateBlocks.offer(i, block); - } - - ExtendedBlock getUpdateBlock(int i) throws InterruptedIOException { - return updateBlocks.poll(i); + boolean hasAllEndBlocks() { + for(int i = 0; i < endBlocks.numQueues(); i++) { + if (endBlocks.peek(i) == null) { + return false; + } + } + return true; } void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) { @@ -130,24 +156,35 @@ public class DFSStripedOutputStream extends DFSOutputStream { if (b == null) { // streamer just has failed, put end block and continue b = block; - putEndBlock(i, b); + offerEndBlock(i, b); } b.setNumBytes(newBytes); } - void putStripedBlock(int i, LocatedBlock block) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("putStripedBlock " + block + ", i=" + i); + /** @return a block representing the entire block group. */ + ExtendedBlock getBlockGroup() { + final StripedDataStreamer s0 = getStripedDataStreamer(0); + final ExtendedBlock b0 = s0.getBlock(); + if (b0 == null) { + return null; } - stripedBlocks.offer(i, block); - } - LocatedBlock getStripedBlock(int i) throws IOException { - final LocatedBlock lb = stripedBlocks.poll(i); - if (lb == null) { - throw new IOException("Failed: i=" + i); + final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0; + final ExtendedBlock block = new ExtendedBlock(b0); + long numBytes = b0.getNumBytes(); + for (int i = 1; i < numDataBlocks; i++) { + final StripedDataStreamer si = getStripedDataStreamer(i); + final ExtendedBlock bi = si.getBlock(); + if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) { + block.setGenerationStamp(bi.getGenerationStamp()); + } + numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock(); } - return lb; + block.setNumBytes(numBytes); + if (LOG.isDebugEnabled()) { + LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes()); + } + return block; } } @@ -223,13 +260,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { private final int numAllBlocks; private final int numDataBlocks; - private StripedDataStreamer getLeadingStreamer() { - return streamers.get(0); - } - @Override ExtendedBlock getBlock() { - return getLeadingStreamer().getBlock(); + return coordinator.getBlockGroup(); } /** Construct a new output stream for creating a file. */ @@ -308,7 +341,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { int count = 0; for(StripedDataStreamer s : streamers) { if (!s.isFailed()) { - s.getErrorState().initExtenalError(); + if (s.getBlock() != null) { + s.getErrorState().initExternalError(); + } count++; } } @@ -325,7 +360,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { private void handleStreamerFailure(String err, Exception e) throws IOException { LOG.warn("Failed: " + err + ", " + this, e); - getCurrentStreamer().setIsFailed(true); + getCurrentStreamer().setFailed(true); checkStreamers(); currentPacket = null; } @@ -443,10 +478,17 @@ public class DFSStripedOutputStream extends DFSOutputStream { dfsClient.endFileLease(fileId); } - //TODO: Handle slow writers (HDFS-7786) - //Cuurently only check if the leading streamer is terminated + @Override boolean isClosed() { - return closed || getLeadingStreamer().streamerClosed(); + if (closed) { + return true; + } + for(StripedDataStreamer s : streamers) { + if (!s.streamerClosed()) { + return false; + } + } + return true; } @Override @@ -560,7 +602,19 @@ public class DFSStripedOutputStream extends DFSOutputStream { @Override protected synchronized void closeImpl() throws IOException { if (isClosed()) { - getLeadingStreamer().getLastException().check(true); + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + for(int i = 0; i < streamers.size(); i++) { + final StripedDataStreamer si = getStripedDataStreamer(i); + try { + si.getLastException().check(true); + } catch (IOException e) { + b.add(e); + } + } + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } return; } @@ -594,7 +648,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { } closeThreads(false); - final ExtendedBlock lastBlock = getCommittedBlock(); + final ExtendedBlock lastBlock = coordinator.getBlockGroup(); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); try { completeFile(lastBlock); @@ -607,30 +661,4 @@ public class DFSStripedOutputStream extends DFSOutputStream { setClosed(); } } - - /** - * Generate the block which is reported and will be committed in NameNode. - * Need to go through all the streamers writing data blocks and add their - * bytesCurBlock together. Note that at this time all streamers have been - * closed. Also this calculation can cover streamers with writing failures. - * - * @return An ExtendedBlock with size of the whole block group. - */ - ExtendedBlock getCommittedBlock() throws IOException { - ExtendedBlock b = getLeadingStreamer().getBlock(); - if (b == null) { - return null; - } - final ExtendedBlock block = new ExtendedBlock(b); - final boolean atBlockGroupBoundary = - getLeadingStreamer().getBytesCurBlock() == 0 && - getLeadingStreamer().getBlock() != null && - getLeadingStreamer().getBlock().getNumBytes() > 0; - for (int i = 1; i < numDataBlocks; i++) { - block.setNumBytes(block.getNumBytes() + - (atBlockGroupBoundary ? streamers.get(i).getBlock().getNumBytes() : - streamers.get(i).getBytesCurBlock())); - } - return block; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 1344d54d61a..c78199ed306 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -209,7 +209,7 @@ class DataStreamer extends Daemon { static class ErrorState { private boolean error = false; - private boolean extenalError = false; + private boolean externalError = false; private int badNodeIndex = -1; private int restartingNodeIndex = -1; private long restartingNodeDeadline = 0; @@ -221,7 +221,7 @@ class DataStreamer extends Daemon { synchronized void reset() { error = false; - extenalError = false; + externalError = false; badNodeIndex = -1; restartingNodeIndex = -1; restartingNodeDeadline = 0; @@ -231,17 +231,21 @@ class DataStreamer extends Daemon { return error; } + synchronized boolean hasExternalErrorOnly() { + return error && externalError && !isNodeMarked(); + } + synchronized boolean hasDatanodeError() { - return error && (isNodeMarked() || extenalError); + return error && (isNodeMarked() || externalError); } synchronized void setError(boolean err) { this.error = err; } - synchronized void initExtenalError() { + synchronized void initExternalError() { setError(true); - this.extenalError = true; + this.externalError = true; } @@ -405,11 +409,13 @@ class DataStreamer extends Daemon { private final LoadingCache excludedNodes; private final String[] favoredNodes; - private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, + private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, + DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage, boolean isAppend, String[] favoredNodes) { + this.block = block; this.dfsClient = dfsClient; this.src = src; this.progress = progress; @@ -434,9 +440,8 @@ class DataStreamer extends Daemon { String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage, String[] favoredNodes) { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, + this(stat, block, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, false, favoredNodes); - this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -450,10 +455,9 @@ class DataStreamer extends Daemon { String src, Progressable progress, DataChecksum checksum, AtomicReference cachingStrategy, ByteArrayManager byteArrayManage) throws IOException { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, + this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, true, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; - block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); } @@ -1074,6 +1078,10 @@ class DataStreamer extends Daemon { if (!errorState.hasDatanodeError()) { return false; } + if (errorState.hasExternalErrorOnly() && block == null) { + // block is not yet initialized, handle external error later. + return false; + } if (response != null) { LOG.info("Error Recovery for " + block + " waiting for responder to exit. "); @@ -1402,15 +1410,28 @@ class DataStreamer extends Daemon { } LocatedBlock updateBlockForPipeline() throws IOException { + return callUpdateBlockForPipeline(block); + } + + LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException { return dfsClient.namenode.updateBlockForPipeline( - block, dfsClient.clientName); + newBlock, dfsClient.clientName); + } + + static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) { + return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), + b.getNumBytes(), newGS); } /** update pipeline at the namenode */ ExtendedBlock updatePipeline(long newGS) throws IOException { - final ExtendedBlock newBlock = new ExtendedBlock( - block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, + final ExtendedBlock newBlock = newBlock(block, newGS); + return callUpdatePipeline(block, newBlock); + } + + ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock) + throws IOException { + dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock, nodes, storageIDs); return newBlock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 7b7db753293..a1777962756 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; +import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -37,18 +38,64 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; -/**************************************************************************** - * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}. - * There are two kinds of StripedDataStreamer, leading streamer and ordinary - * stream. Leading streamer requests a block group from NameNode, unwraps - * it to located blocks and transfers each located block to its corresponding - * ordinary streamer via a blocking queue. - * - ****************************************************************************/ +/** + * This class extends {@link DataStreamer} to support writing striped blocks + * to datanodes. + * A {@link DFSStripedOutputStream} has multiple {@link StripedDataStreamer}s. + * Whenever the streamers need to talk the namenode, only the fastest streamer + * sends an rpc call to the namenode and then populates the result for the + * other streamers. + */ public class StripedDataStreamer extends DataStreamer { + /** + * This class is designed for multiple threads to share a + * {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest + * thread calling poll populates entries to the queue and the other threads + * will wait for it. Once the entries are populated, all the threads can poll + * their entries. + * + * @param the queue entry type. + */ + static abstract class ConcurrentPoll { + private final MultipleBlockingQueue queue; + + ConcurrentPoll(MultipleBlockingQueue queue) { + this.queue = queue; + } + + T poll(final int i) throws IOException { + for(;;) { + synchronized(queue) { + final T polled = queue.poll(i); + if (polled != null) { // already populated; return polled item. + return polled; + } + if (isReady2Populate()) { + populate(); + return queue.poll(i); + } + } + + // sleep and then retry. + try { + Thread.sleep(100); + } catch(InterruptedException ie) { + throw DFSUtil.toInterruptedIOException( + "Sleep interrupted during poll", ie); + } + } + } + + boolean isReady2Populate() { + return queue.isEmpty(); + } + + abstract void populate() throws IOException; + } + private final Coordinator coordinator; private final int index; - private volatile boolean isFailed; + private volatile boolean failed; StripedDataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, @@ -66,16 +113,12 @@ public class StripedDataStreamer extends DataStreamer { return index; } - void setIsFailed(boolean isFailed) { - this.isFailed = isFailed; + void setFailed(boolean failed) { + this.failed = failed; } boolean isFailed() { - return isFailed; - } - - public boolean isLeadingStreamer () { - return index == 0; + return failed; } private boolean isParityStreamer() { @@ -85,81 +128,110 @@ public class StripedDataStreamer extends DataStreamer { @Override protected void endBlock() { if (!isParityStreamer()) { - coordinator.putEndBlock(index, block); + coordinator.offerEndBlock(index, block); } super.endBlock(); } @Override - protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) + protected LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes) throws IOException { - if (isLeadingStreamer()) { - if (block != null) { - // set numByte for the previous block group - long bytes = 0; - for (int i = 0; i < NUM_DATA_BLOCKS; i++) { - final ExtendedBlock b = coordinator.getEndBlock(i); - if (b != null) { - StripedBlockUtil.checkBlocks(block, i, b); - bytes += b.getNumBytes(); - } - } - block.setNumBytes(bytes); + final MultipleBlockingQueue followingBlocks + = coordinator.getFollowingBlocks(); + return new ConcurrentPoll(followingBlocks) { + @Override + boolean isReady2Populate() { + return super.isReady2Populate() + && (block == null || coordinator.hasAllEndBlocks()); } - putLoactedBlocks(super.locateFollowingBlock(excludedNodes)); - } + @Override + void populate() throws IOException { + getLastException().check(false); - return coordinator.getStripedBlock(index); - } + if (block != null) { + // set numByte for the previous block group + long bytes = 0; + for (int i = 0; i < NUM_DATA_BLOCKS; i++) { + final ExtendedBlock b = coordinator.takeEndBlock(i); + StripedBlockUtil.checkBlocks(index, block, i, b); + bytes += b.getNumBytes(); + } + block.setNumBytes(bytes); + block.setBlockId(block.getBlockId() - index); + } - void putLoactedBlocks(LocatedBlock lb) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Obtained block group " + lb); - } - LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( - (LocatedStripedBlock)lb, - BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + if (LOG.isDebugEnabled()) { + LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block); + } - // TODO allow write to continue if blocks.length >= NUM_DATA_BLOCKS - assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) : - "Fail to get block group from namenode: blockGroupSize: " + - (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " + - blocks.length; - for (int i = 0; i < blocks.length; i++) { - coordinator.putStripedBlock(i, blocks[i]); - } + final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock( + excludedNodes); + final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock)lb, + BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + + for (int i = 0; i < blocks.length; i++) { + if (!coordinator.getStripedDataStreamer(i).isFailed()) { + if (blocks[i] == null) { + getLastException().set( + new IOException("Failed to get following block, i=" + i)); + } else { + followingBlocks.offer(i, blocks[i]); + } + } + } + } + }.poll(index); } @Override LocatedBlock updateBlockForPipeline() throws IOException { - if (isLeadingStreamer()) { - final LocatedBlock updated = super.updateBlockForPipeline(); - final ExtendedBlock block = updated.getBlock(); - for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { - final LocatedBlock lb = new LocatedBlock(block, null, null, null, - -1, updated.isCorrupt(), null); - lb.setBlockToken(updated.getBlockToken()); - coordinator.putStripedBlock(i, lb); + final MultipleBlockingQueue newBlocks + = coordinator.getNewBlocks(); + return new ConcurrentPoll(newBlocks) { + @Override + void populate() throws IOException { + final ExtendedBlock bg = coordinator.getBlockGroup(); + final LocatedBlock updated = callUpdateBlockForPipeline(bg); + final long newGS = updated.getBlock().getGenerationStamp(); + for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { + final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); + if (bi != null) { + final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), + null, null, null, -1, updated.isCorrupt(), null); + lb.setBlockToken(updated.getBlockToken()); + newBlocks.offer(i, lb); + } else { + final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i); + lb.getBlock().setGenerationStamp(newGS); + } + } } - } - return coordinator.getStripedBlock(index); + }.poll(index); } @Override - ExtendedBlock updatePipeline(long newGS) throws IOException { - if (isLeadingStreamer()) { - final ExtendedBlock newBlock = super.updatePipeline(newGS); - for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { - coordinator.putUpdateBlock(i, new ExtendedBlock(newBlock)); + ExtendedBlock updatePipeline(final long newGS) throws IOException { + final MultipleBlockingQueue updateBlocks + = coordinator.getUpdateBlocks(); + return new ConcurrentPoll(updateBlocks) { + @Override + void populate() throws IOException { + final ExtendedBlock bg = coordinator.getBlockGroup(); + final ExtendedBlock newBG = newBlock(bg, newGS); + final ExtendedBlock updated = callUpdatePipeline(bg, newBG); + for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { + final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); + updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp())); + } } - } - return coordinator.getUpdateBlock(index); + }.poll(index); } @Override public String toString() { - return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0) + return "#" + index + ": failed? " + Boolean.toString(failed).charAt(0) + ", " + super.toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 34ec06d9998..9aef436a806 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -103,8 +103,6 @@ public class DfsClientConf { private final int hedgedReadThreadpoolSize; private final int stripedReadThreadpoolSize; - private final int stripedWriteMaxSecondsGetStripedBlock; - private final int stripedWriteMaxSecondsGetEndedBlock; public DfsClientConf(Configuration conf) { @@ -228,13 +226,6 @@ public class DfsClientConf { Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + " must be greater than 0."); - - stripedWriteMaxSecondsGetStripedBlock = conf.getInt( - HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_KEY, - HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT); - stripedWriteMaxSecondsGetEndedBlock = conf.getInt( - HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_KEY, - HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT); } private DataChecksum.Type getChecksumType(Configuration conf) { @@ -518,20 +509,6 @@ public class DfsClientConf { return stripedReadThreadpoolSize; } - /** - * @return stripedWriteMaxSecondsGetStripedBlock - */ - public int getStripedWriteMaxSecondsGetStripedBlock() { - return stripedWriteMaxSecondsGetStripedBlock; - } - - /** - * @return stripedWriteMaxSecondsGetEndedBlock - */ - public int getStripedWriteMaxSecondsGetEndedBlock() { - return stripedWriteMaxSecondsGetEndedBlock; - } - /** * @return the shortCircuitConf */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index a29e8e35185..579434b6706 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -950,22 +950,22 @@ public class StripedBlockUtil { /** * Check if the information such as IDs and generation stamps in block-i - * match block-0. + * match block-j, where block-i and block-j are in the same group. */ - public static void checkBlocks(ExtendedBlock block0, int i, - ExtendedBlock blocki) throws IOException { + public static void checkBlocks(int j, ExtendedBlock blockj, + int i, ExtendedBlock blocki) throws IOException { - if (!blocki.getBlockPoolId().equals(block0.getBlockPoolId())) { - throw new IOException("Block pool IDs mismatched: block0=" - + block0 + ", block" + i + "=" + blocki); + if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) { + throw new IOException("Block pool IDs mismatched: block" + j + "=" + + blockj + ", block" + i + "=" + blocki); } - if (blocki.getBlockId() - i != block0.getBlockId()) { - throw new IOException("Block IDs mismatched: block0=" - + block0 + ", block" + i + "=" + blocki); + if (blocki.getBlockId() - i != blockj.getBlockId() - j) { + throw new IOException("Block IDs mismatched: block" + j + "=" + + blockj + ", block" + i + "=" + blocki); } - if (blocki.getGenerationStamp() != block0.getGenerationStamp()) { - throw new IOException("Generation stamps mismatched: block0=" - + block0 + ", block" + i + "=" + blocki); + if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) { + throw new IOException("Generation stamps mismatched: block" + j + "=" + + blockj + ", block" + i + "=" + blocki); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index d2e04582718..8944cde02de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -92,6 +92,13 @@ public class TestDFSStripedOutputStreamWithFailure { return (byte)pos; } + @Test(timeout=120000) + public void testDatanodeFailure0() { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + final int dn = 0; + runTest("file" + dn, length, dn); + } + @Test(timeout=120000) public void testDatanodeFailure1() { final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);