HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to tolerate datanode failure. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Jing Zhao 2015-06-19 10:23:45 -07:00
parent 05c696882e
commit 3682e01984
8 changed files with 308 additions and 210 deletions

View File

@ -189,16 +189,6 @@ public interface HdfsClientConfigKeys {
int THREADPOOL_SIZE_DEFAULT = 18; int THREADPOOL_SIZE_DEFAULT = 18;
} }
/** dfs.client.write.striped configuration properties */
interface StripedWrite {
String PREFIX = Write.PREFIX + "striped.";
String MAX_SECONDS_GET_STRIPED_BLOCK_KEY = PREFIX + "max-seconds-get-striped-block";
int MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT = 90;
String MAX_SECONDS_GET_ENDED_BLOCK_KEY = PREFIX + "max-seconds-get-ended-block";
int MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT = 60;
}
/** dfs.http.client configuration properties */ /** dfs.http.client configuration properties */
interface HttpClient { interface HttpClient {
String PREFIX = "dfs.http.client."; String PREFIX = "dfs.http.client.";

View File

@ -308,3 +308,6 @@
HDFS-8466. Refactor BlockInfoContiguous and fix NPE in HDFS-8466. Refactor BlockInfoContiguous and fix NPE in
TestBlockInfo#testCopyConstructor() (vinayakumarb) TestBlockInfo#testCopyConstructor() (vinayakumarb)
HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to
tolerate datanode failure. (Tsz Wo Nicholas Sze via jing9)

View File

@ -28,7 +28,6 @@ import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -40,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
@ -51,27 +49,33 @@ import org.apache.htrace.TraceScope;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/**************************************************************** /**
* The DFSStripedOutputStream class supports writing files in striped * This class supports writing files in striped layout and erasure coded format.
* layout. Each stripe contains a sequence of cells and multiple * Each stripe contains a sequence of cells.
* {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible */
* for writing the cells to different datanodes.
*
****************************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream { public class DFSStripedOutputStream extends DFSOutputStream {
static class MultipleBlockingQueue<T> { static class MultipleBlockingQueue<T> {
private final int pullTimeout;
private final List<BlockingQueue<T>> queues; private final List<BlockingQueue<T>> queues;
MultipleBlockingQueue(int numQueue, int queueSize, int pullTimeout) { MultipleBlockingQueue(int numQueue, int queueSize) {
queues = new ArrayList<>(numQueue); queues = new ArrayList<>(numQueue);
for (int i = 0; i < numQueue; i++) { for (int i = 0; i < numQueue; i++) {
queues.add(new LinkedBlockingQueue<T>(queueSize)); queues.add(new LinkedBlockingQueue<T>(queueSize));
} }
}
this.pullTimeout = pullTimeout; boolean isEmpty() {
for(int i = 0; i < queues.size(); i++) {
if (!queues.get(i).isEmpty()) {
return false;
}
}
return true;
}
int numQueues() {
return queues.size();
} }
void offer(int i, T object) { void offer(int i, T object) {
@ -80,49 +84,71 @@ public class DFSStripedOutputStream extends DFSOutputStream {
+ " to queue, i=" + i); + " to queue, i=" + i);
} }
T poll(int i) throws InterruptedIOException { T take(int i) throws InterruptedIOException {
try { try {
return queues.get(i).poll(pullTimeout, TimeUnit.SECONDS); return queues.get(i).take();
} catch (InterruptedException e) { } catch(InterruptedException ie) {
throw DFSUtil.toInterruptedIOException("poll interrupted, i=" + i, e); throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie);
} }
} }
T poll(int i) {
return queues.get(i).poll();
}
T peek(int i) { T peek(int i) {
return queues.get(i).peek(); return queues.get(i).peek();
} }
} }
/** Coordinate the communication between the streamers. */ /** Coordinate the communication between the streamers. */
static class Coordinator { class Coordinator {
private final MultipleBlockingQueue<LocatedBlock> stripedBlocks; private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
private final MultipleBlockingQueue<ExtendedBlock> endBlocks; private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
private final MultipleBlockingQueue<LocatedBlock> newBlocks;
private final MultipleBlockingQueue<ExtendedBlock> updateBlocks; private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
Coordinator(final DfsClientConf conf, final int numDataBlocks, Coordinator(final DfsClientConf conf, final int numDataBlocks,
final int numAllBlocks) { final int numAllBlocks) {
stripedBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1, followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
conf.getStripedWriteMaxSecondsGetStripedBlock()); endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1,
conf.getStripedWriteMaxSecondsGetEndedBlock()); newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1, updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
conf.getStripedWriteMaxSecondsGetStripedBlock());
} }
void putEndBlock(int i, ExtendedBlock block) { MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
return followingBlocks;
}
MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
return newBlocks;
}
MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
return updateBlocks;
}
StripedDataStreamer getStripedDataStreamer(int i) {
return DFSStripedOutputStream.this.getStripedDataStreamer(i);
}
void offerEndBlock(int i, ExtendedBlock block) {
endBlocks.offer(i, block); endBlocks.offer(i, block);
} }
ExtendedBlock getEndBlock(int i) throws InterruptedIOException { ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
return endBlocks.poll(i); return endBlocks.take(i);
} }
void putUpdateBlock(int i, ExtendedBlock block) { boolean hasAllEndBlocks() {
updateBlocks.offer(i, block); for(int i = 0; i < endBlocks.numQueues(); i++) {
if (endBlocks.peek(i) == null) {
return false;
} }
}
ExtendedBlock getUpdateBlock(int i) throws InterruptedIOException { return true;
return updateBlocks.poll(i);
} }
void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) { void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
@ -130,24 +156,35 @@ public class DFSStripedOutputStream extends DFSOutputStream {
if (b == null) { if (b == null) {
// streamer just has failed, put end block and continue // streamer just has failed, put end block and continue
b = block; b = block;
putEndBlock(i, b); offerEndBlock(i, b);
} }
b.setNumBytes(newBytes); b.setNumBytes(newBytes);
} }
void putStripedBlock(int i, LocatedBlock block) throws IOException { /** @return a block representing the entire block group. */
if (LOG.isDebugEnabled()) { ExtendedBlock getBlockGroup() {
LOG.debug("putStripedBlock " + block + ", i=" + i); final StripedDataStreamer s0 = getStripedDataStreamer(0);
} final ExtendedBlock b0 = s0.getBlock();
stripedBlocks.offer(i, block); if (b0 == null) {
return null;
} }
LocatedBlock getStripedBlock(int i) throws IOException { final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
final LocatedBlock lb = stripedBlocks.poll(i); final ExtendedBlock block = new ExtendedBlock(b0);
if (lb == null) { long numBytes = b0.getNumBytes();
throw new IOException("Failed: i=" + i); for (int i = 1; i < numDataBlocks; i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
final ExtendedBlock bi = si.getBlock();
if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(bi.getGenerationStamp());
} }
return lb; numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
}
block.setNumBytes(numBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
}
return block;
} }
} }
@ -223,13 +260,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private final int numAllBlocks; private final int numAllBlocks;
private final int numDataBlocks; private final int numDataBlocks;
private StripedDataStreamer getLeadingStreamer() {
return streamers.get(0);
}
@Override @Override
ExtendedBlock getBlock() { ExtendedBlock getBlock() {
return getLeadingStreamer().getBlock(); return coordinator.getBlockGroup();
} }
/** Construct a new output stream for creating a file. */ /** Construct a new output stream for creating a file. */
@ -308,7 +341,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
int count = 0; int count = 0;
for(StripedDataStreamer s : streamers) { for(StripedDataStreamer s : streamers) {
if (!s.isFailed()) { if (!s.isFailed()) {
s.getErrorState().initExtenalError(); if (s.getBlock() != null) {
s.getErrorState().initExternalError();
}
count++; count++;
} }
} }
@ -325,7 +360,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private void handleStreamerFailure(String err, private void handleStreamerFailure(String err,
Exception e) throws IOException { Exception e) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e); LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().setIsFailed(true); getCurrentStreamer().setFailed(true);
checkStreamers(); checkStreamers();
currentPacket = null; currentPacket = null;
} }
@ -443,10 +478,17 @@ public class DFSStripedOutputStream extends DFSOutputStream {
dfsClient.endFileLease(fileId); dfsClient.endFileLease(fileId);
} }
//TODO: Handle slow writers (HDFS-7786) @Override
//Cuurently only check if the leading streamer is terminated
boolean isClosed() { boolean isClosed() {
return closed || getLeadingStreamer().streamerClosed(); if (closed) {
return true;
}
for(StripedDataStreamer s : streamers) {
if (!s.streamerClosed()) {
return false;
}
}
return true;
} }
@Override @Override
@ -560,7 +602,19 @@ public class DFSStripedOutputStream extends DFSOutputStream {
@Override @Override
protected synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {
if (isClosed()) { if (isClosed()) {
getLeadingStreamer().getLastException().check(true); final MultipleIOException.Builder b = new MultipleIOException.Builder();
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = getStripedDataStreamer(i);
try {
si.getLastException().check(true);
} catch (IOException e) {
b.add(e);
}
}
final IOException ioe = b.build();
if (ioe != null) {
throw ioe;
}
return; return;
} }
@ -594,7 +648,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
} }
closeThreads(false); closeThreads(false);
final ExtendedBlock lastBlock = getCommittedBlock(); final ExtendedBlock lastBlock = coordinator.getBlockGroup();
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try { try {
completeFile(lastBlock); completeFile(lastBlock);
@ -607,30 +661,4 @@ public class DFSStripedOutputStream extends DFSOutputStream {
setClosed(); setClosed();
} }
} }
/**
* Generate the block which is reported and will be committed in NameNode.
* Need to go through all the streamers writing data blocks and add their
* bytesCurBlock together. Note that at this time all streamers have been
* closed. Also this calculation can cover streamers with writing failures.
*
* @return An ExtendedBlock with size of the whole block group.
*/
ExtendedBlock getCommittedBlock() throws IOException {
ExtendedBlock b = getLeadingStreamer().getBlock();
if (b == null) {
return null;
}
final ExtendedBlock block = new ExtendedBlock(b);
final boolean atBlockGroupBoundary =
getLeadingStreamer().getBytesCurBlock() == 0 &&
getLeadingStreamer().getBlock() != null &&
getLeadingStreamer().getBlock().getNumBytes() > 0;
for (int i = 1; i < numDataBlocks; i++) {
block.setNumBytes(block.getNumBytes() +
(atBlockGroupBoundary ? streamers.get(i).getBlock().getNumBytes() :
streamers.get(i).getBytesCurBlock()));
}
return block;
}
} }

View File

@ -209,7 +209,7 @@ class DataStreamer extends Daemon {
static class ErrorState { static class ErrorState {
private boolean error = false; private boolean error = false;
private boolean extenalError = false; private boolean externalError = false;
private int badNodeIndex = -1; private int badNodeIndex = -1;
private int restartingNodeIndex = -1; private int restartingNodeIndex = -1;
private long restartingNodeDeadline = 0; private long restartingNodeDeadline = 0;
@ -221,7 +221,7 @@ class DataStreamer extends Daemon {
synchronized void reset() { synchronized void reset() {
error = false; error = false;
extenalError = false; externalError = false;
badNodeIndex = -1; badNodeIndex = -1;
restartingNodeIndex = -1; restartingNodeIndex = -1;
restartingNodeDeadline = 0; restartingNodeDeadline = 0;
@ -231,17 +231,21 @@ class DataStreamer extends Daemon {
return error; return error;
} }
synchronized boolean hasExternalErrorOnly() {
return error && externalError && !isNodeMarked();
}
synchronized boolean hasDatanodeError() { synchronized boolean hasDatanodeError() {
return error && (isNodeMarked() || extenalError); return error && (isNodeMarked() || externalError);
} }
synchronized void setError(boolean err) { synchronized void setError(boolean err) {
this.error = err; this.error = err;
} }
synchronized void initExtenalError() { synchronized void initExternalError() {
setError(true); setError(true);
this.extenalError = true; this.externalError = true;
} }
@ -405,11 +409,13 @@ class DataStreamer extends Daemon {
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes; private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
private final String[] favoredNodes; private final String[] favoredNodes;
private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy, AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, ByteArrayManager byteArrayManage,
boolean isAppend, String[] favoredNodes) { boolean isAppend, String[] favoredNodes) {
this.block = block;
this.dfsClient = dfsClient; this.dfsClient = dfsClient;
this.src = src; this.src = src;
this.progress = progress; this.progress = progress;
@ -434,9 +440,8 @@ class DataStreamer extends Daemon {
String src, Progressable progress, DataChecksum checksum, String src, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy, AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, String[] favoredNodes) { ByteArrayManager byteArrayManage, String[] favoredNodes) {
this(stat, dfsClient, src, progress, checksum, cachingStrategy, this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, false, favoredNodes); byteArrayManage, false, favoredNodes);
this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
} }
@ -450,10 +455,9 @@ class DataStreamer extends Daemon {
String src, Progressable progress, DataChecksum checksum, String src, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy, AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage) throws IOException { ByteArrayManager byteArrayManage) throws IOException {
this(stat, dfsClient, src, progress, checksum, cachingStrategy, this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, true, null); byteArrayManage, true, null);
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
block = lastBlock.getBlock();
bytesSent = block.getNumBytes(); bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken(); accessToken = lastBlock.getBlockToken();
} }
@ -1074,6 +1078,10 @@ class DataStreamer extends Daemon {
if (!errorState.hasDatanodeError()) { if (!errorState.hasDatanodeError()) {
return false; return false;
} }
if (errorState.hasExternalErrorOnly() && block == null) {
// block is not yet initialized, handle external error later.
return false;
}
if (response != null) { if (response != null) {
LOG.info("Error Recovery for " + block + LOG.info("Error Recovery for " + block +
" waiting for responder to exit. "); " waiting for responder to exit. ");
@ -1402,15 +1410,28 @@ class DataStreamer extends Daemon {
} }
LocatedBlock updateBlockForPipeline() throws IOException { LocatedBlock updateBlockForPipeline() throws IOException {
return callUpdateBlockForPipeline(block);
}
LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException {
return dfsClient.namenode.updateBlockForPipeline( return dfsClient.namenode.updateBlockForPipeline(
block, dfsClient.clientName); newBlock, dfsClient.clientName);
}
static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
b.getNumBytes(), newGS);
} }
/** update pipeline at the namenode */ /** update pipeline at the namenode */
ExtendedBlock updatePipeline(long newGS) throws IOException { ExtendedBlock updatePipeline(long newGS) throws IOException {
final ExtendedBlock newBlock = new ExtendedBlock( final ExtendedBlock newBlock = newBlock(block, newGS);
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); return callUpdatePipeline(block, newBlock);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, }
ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock)
throws IOException {
dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock,
nodes, storageIDs); nodes, storageIDs);
return newBlock; return newBlock;
} }

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -37,18 +38,64 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
/**************************************************************************** /**
* The StripedDataStreamer class is used by {@link DFSStripedOutputStream}. * This class extends {@link DataStreamer} to support writing striped blocks
* There are two kinds of StripedDataStreamer, leading streamer and ordinary * to datanodes.
* stream. Leading streamer requests a block group from NameNode, unwraps * A {@link DFSStripedOutputStream} has multiple {@link StripedDataStreamer}s.
* it to located blocks and transfers each located block to its corresponding * Whenever the streamers need to talk the namenode, only the fastest streamer
* ordinary streamer via a blocking queue. * sends an rpc call to the namenode and then populates the result for the
* * other streamers.
****************************************************************************/ */
public class StripedDataStreamer extends DataStreamer { public class StripedDataStreamer extends DataStreamer {
/**
* This class is designed for multiple threads to share a
* {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest
* thread calling poll populates entries to the queue and the other threads
* will wait for it. Once the entries are populated, all the threads can poll
* their entries.
*
* @param <T> the queue entry type.
*/
static abstract class ConcurrentPoll<T> {
private final MultipleBlockingQueue<T> queue;
ConcurrentPoll(MultipleBlockingQueue<T> queue) {
this.queue = queue;
}
T poll(final int i) throws IOException {
for(;;) {
synchronized(queue) {
final T polled = queue.poll(i);
if (polled != null) { // already populated; return polled item.
return polled;
}
if (isReady2Populate()) {
populate();
return queue.poll(i);
}
}
// sleep and then retry.
try {
Thread.sleep(100);
} catch(InterruptedException ie) {
throw DFSUtil.toInterruptedIOException(
"Sleep interrupted during poll", ie);
}
}
}
boolean isReady2Populate() {
return queue.isEmpty();
}
abstract void populate() throws IOException;
}
private final Coordinator coordinator; private final Coordinator coordinator;
private final int index; private final int index;
private volatile boolean isFailed; private volatile boolean failed;
StripedDataStreamer(HdfsFileStatus stat, StripedDataStreamer(HdfsFileStatus stat,
DFSClient dfsClient, String src, DFSClient dfsClient, String src,
@ -66,16 +113,12 @@ public class StripedDataStreamer extends DataStreamer {
return index; return index;
} }
void setIsFailed(boolean isFailed) { void setFailed(boolean failed) {
this.isFailed = isFailed; this.failed = failed;
} }
boolean isFailed() { boolean isFailed() {
return isFailed; return failed;
}
public boolean isLeadingStreamer () {
return index == 0;
} }
private boolean isParityStreamer() { private boolean isParityStreamer() {
@ -85,81 +128,110 @@ public class StripedDataStreamer extends DataStreamer {
@Override @Override
protected void endBlock() { protected void endBlock() {
if (!isParityStreamer()) { if (!isParityStreamer()) {
coordinator.putEndBlock(index, block); coordinator.offerEndBlock(index, block);
} }
super.endBlock(); super.endBlock();
} }
@Override @Override
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) protected LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes)
throws IOException { throws IOException {
if (isLeadingStreamer()) { final MultipleBlockingQueue<LocatedBlock> followingBlocks
= coordinator.getFollowingBlocks();
return new ConcurrentPoll<LocatedBlock>(followingBlocks) {
@Override
boolean isReady2Populate() {
return super.isReady2Populate()
&& (block == null || coordinator.hasAllEndBlocks());
}
@Override
void populate() throws IOException {
getLastException().check(false);
if (block != null) { if (block != null) {
// set numByte for the previous block group // set numByte for the previous block group
long bytes = 0; long bytes = 0;
for (int i = 0; i < NUM_DATA_BLOCKS; i++) { for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
final ExtendedBlock b = coordinator.getEndBlock(i); final ExtendedBlock b = coordinator.takeEndBlock(i);
if (b != null) { StripedBlockUtil.checkBlocks(index, block, i, b);
StripedBlockUtil.checkBlocks(block, i, b);
bytes += b.getNumBytes(); bytes += b.getNumBytes();
} }
}
block.setNumBytes(bytes); block.setNumBytes(bytes);
block.setBlockId(block.getBlockId() - index);
} }
putLoactedBlocks(super.locateFollowingBlock(excludedNodes));
}
return coordinator.getStripedBlock(index);
}
void putLoactedBlocks(LocatedBlock lb) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Obtained block group " + lb); LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block);
} }
LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
excludedNodes);
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock)lb, (LocatedStripedBlock)lb,
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
// TODO allow write to continue if blocks.length >= NUM_DATA_BLOCKS
assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
"Fail to get block group from namenode: blockGroupSize: " +
(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
blocks.length;
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
coordinator.putStripedBlock(i, blocks[i]); if (!coordinator.getStripedDataStreamer(i).isFailed()) {
if (blocks[i] == null) {
getLastException().set(
new IOException("Failed to get following block, i=" + i));
} else {
followingBlocks.offer(i, blocks[i]);
} }
} }
}
}
}.poll(index);
}
@Override @Override
LocatedBlock updateBlockForPipeline() throws IOException { LocatedBlock updateBlockForPipeline() throws IOException {
if (isLeadingStreamer()) { final MultipleBlockingQueue<LocatedBlock> newBlocks
final LocatedBlock updated = super.updateBlockForPipeline(); = coordinator.getNewBlocks();
final ExtendedBlock block = updated.getBlock(); return new ConcurrentPoll<LocatedBlock>(newBlocks) {
@Override
void populate() throws IOException {
final ExtendedBlock bg = coordinator.getBlockGroup();
final LocatedBlock updated = callUpdateBlockForPipeline(bg);
final long newGS = updated.getBlock().getGenerationStamp();
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
final LocatedBlock lb = new LocatedBlock(block, null, null, null, final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
-1, updated.isCorrupt(), null); if (bi != null) {
final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
null, null, null, -1, updated.isCorrupt(), null);
lb.setBlockToken(updated.getBlockToken()); lb.setBlockToken(updated.getBlockToken());
coordinator.putStripedBlock(i, lb); newBlocks.offer(i, lb);
} else {
final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i);
lb.getBlock().setGenerationStamp(newGS);
} }
} }
return coordinator.getStripedBlock(index); }
}.poll(index);
} }
@Override @Override
ExtendedBlock updatePipeline(long newGS) throws IOException { ExtendedBlock updatePipeline(final long newGS) throws IOException {
if (isLeadingStreamer()) { final MultipleBlockingQueue<ExtendedBlock> updateBlocks
final ExtendedBlock newBlock = super.updatePipeline(newGS); = coordinator.getUpdateBlocks();
return new ConcurrentPoll<ExtendedBlock>(updateBlocks) {
@Override
void populate() throws IOException {
final ExtendedBlock bg = coordinator.getBlockGroup();
final ExtendedBlock newBG = newBlock(bg, newGS);
final ExtendedBlock updated = callUpdatePipeline(bg, newBG);
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
coordinator.putUpdateBlock(i, new ExtendedBlock(newBlock)); final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp()));
} }
} }
return coordinator.getUpdateBlock(index); }.poll(index);
} }
@Override @Override
public String toString() { public String toString() {
return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0) return "#" + index + ": failed? " + Boolean.toString(failed).charAt(0)
+ ", " + super.toString(); + ", " + super.toString();
} }
} }

View File

@ -103,8 +103,6 @@ public class DfsClientConf {
private final int hedgedReadThreadpoolSize; private final int hedgedReadThreadpoolSize;
private final int stripedReadThreadpoolSize; private final int stripedReadThreadpoolSize;
private final int stripedWriteMaxSecondsGetStripedBlock;
private final int stripedWriteMaxSecondsGetEndedBlock;
public DfsClientConf(Configuration conf) { public DfsClientConf(Configuration conf) {
@ -228,13 +226,6 @@ public class DfsClientConf {
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
" must be greater than 0."); " must be greater than 0.");
stripedWriteMaxSecondsGetStripedBlock = conf.getInt(
HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_KEY,
HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT);
stripedWriteMaxSecondsGetEndedBlock = conf.getInt(
HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_KEY,
HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT);
} }
private DataChecksum.Type getChecksumType(Configuration conf) { private DataChecksum.Type getChecksumType(Configuration conf) {
@ -518,20 +509,6 @@ public class DfsClientConf {
return stripedReadThreadpoolSize; return stripedReadThreadpoolSize;
} }
/**
* @return stripedWriteMaxSecondsGetStripedBlock
*/
public int getStripedWriteMaxSecondsGetStripedBlock() {
return stripedWriteMaxSecondsGetStripedBlock;
}
/**
* @return stripedWriteMaxSecondsGetEndedBlock
*/
public int getStripedWriteMaxSecondsGetEndedBlock() {
return stripedWriteMaxSecondsGetEndedBlock;
}
/** /**
* @return the shortCircuitConf * @return the shortCircuitConf
*/ */

View File

@ -950,22 +950,22 @@ public class StripedBlockUtil {
/** /**
* Check if the information such as IDs and generation stamps in block-i * Check if the information such as IDs and generation stamps in block-i
* match block-0. * match block-j, where block-i and block-j are in the same group.
*/ */
public static void checkBlocks(ExtendedBlock block0, int i, public static void checkBlocks(int j, ExtendedBlock blockj,
ExtendedBlock blocki) throws IOException { int i, ExtendedBlock blocki) throws IOException {
if (!blocki.getBlockPoolId().equals(block0.getBlockPoolId())) { if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) {
throw new IOException("Block pool IDs mismatched: block0=" throw new IOException("Block pool IDs mismatched: block" + j + "="
+ block0 + ", block" + i + "=" + blocki); + blockj + ", block" + i + "=" + blocki);
} }
if (blocki.getBlockId() - i != block0.getBlockId()) { if (blocki.getBlockId() - i != blockj.getBlockId() - j) {
throw new IOException("Block IDs mismatched: block0=" throw new IOException("Block IDs mismatched: block" + j + "="
+ block0 + ", block" + i + "=" + blocki); + blockj + ", block" + i + "=" + blocki);
} }
if (blocki.getGenerationStamp() != block0.getGenerationStamp()) { if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) {
throw new IOException("Generation stamps mismatched: block0=" throw new IOException("Generation stamps mismatched: block" + j + "="
+ block0 + ", block" + i + "=" + blocki); + blockj + ", block" + i + "=" + blocki);
} }
} }

View File

@ -92,6 +92,13 @@ public class TestDFSStripedOutputStreamWithFailure {
return (byte)pos; return (byte)pos;
} }
@Test(timeout=120000)
public void testDatanodeFailure0() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
final int dn = 0;
runTest("file" + dn, length, dn);
}
@Test(timeout=120000) @Test(timeout=120000)
public void testDatanodeFailure1() { public void testDatanodeFailure1() {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);