HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao.

(cherry picked from commit f3cdf29af4)
This commit is contained in:
Wei-Chiu Chuang 2017-02-25 21:13:51 -08:00
parent 05ab09ca87
commit 295e8685dd
3 changed files with 83 additions and 45 deletions

View File

@ -606,9 +606,9 @@ public class DFSOutputStream extends FSOutputSummer
// update the block length first time irrespective of flag
if (updateLength || getStreamer().getPersistBlocks().get()) {
synchronized (this) {
if (!getStreamer().streamerClosed()
&& getStreamer().getBlock() != null) {
lastBlockLength = getStreamer().getBlock().getNumBytes();
final ExtendedBlock block = getStreamer().getBlock();
if (!getStreamer().streamerClosed() && block != null) {
lastBlockLength = block.getNumBytes();
}
}
}

View File

@ -150,8 +150,6 @@ class DataStreamer extends Daemon {
/**
* Record a connection exception.
* @param e
* @throws InvalidEncryptionKeyException
*/
void recordFailure(final InvalidEncryptionKeyException e)
throws InvalidEncryptionKeyException {
@ -186,9 +184,8 @@ class DataStreamer extends Daemon {
final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//send the TRANSFER_BLOCK request
new Sender(out)
.transferBlock(block, blockToken, dfsClient.clientName, targets,
targetStorageTypes);
new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
dfsClient.clientName, targets, targetStorageTypes);
out.flush();
//ack
BlockOpResponseProto transferResponse = BlockOpResponseProto
@ -207,6 +204,42 @@ class DataStreamer extends Daemon {
}
}
static class BlockToWrite {
private ExtendedBlock currentBlock;
BlockToWrite(ExtendedBlock block) {
setCurrentBlock(block);
}
synchronized ExtendedBlock getCurrentBlock() {
return currentBlock == null ? null : new ExtendedBlock(currentBlock);
}
synchronized long getNumBytes() {
return currentBlock == null ? 0 : currentBlock.getNumBytes();
}
synchronized void setCurrentBlock(ExtendedBlock block) {
currentBlock = (block == null || block.getLocalBlock() == null) ?
null : new ExtendedBlock(block);
}
synchronized void setNumBytes(long numBytes) {
assert currentBlock != null;
currentBlock.setNumBytes(numBytes);
}
synchronized void setGenerationStamp(long generationStamp) {
assert currentBlock != null;
currentBlock.setGenerationStamp(generationStamp);
}
@Override
public synchronized String toString() {
return currentBlock == null ? "null" : currentBlock.toString();
}
}
/**
* Create a socket for a write pipeline
*
@ -420,7 +453,7 @@ class DataStreamer extends Daemon {
}
private volatile boolean streamerClosed = false;
private volatile ExtendedBlock block; // its length is number of bytes acked
private final BlockToWrite block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
@ -481,12 +514,14 @@ class DataStreamer extends Daemon {
private final String[] favoredNodes;
private final EnumSet<AddBlockFlag> addBlockFlags;
private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
DFSClient dfsClient, String src,
Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage,
boolean isAppend, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) {
this.block = new BlockToWrite(block);
this.dfsClient = dfsClient;
this.src = src;
this.progress = progress;
@ -512,9 +547,8 @@ class DataStreamer extends Daemon {
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) {
this(stat, dfsClient, src, progress, checksum, cachingStrategy,
this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, false, favoredNodes, flags);
this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@ -527,10 +561,9 @@ class DataStreamer extends Daemon {
String src, Progressable progress, DataChecksum checksum,
AtomicReference<CachingStrategy> cachingStrategy,
ByteArrayManager byteArrayManage) throws IOException {
this(stat, dfsClient, src, progress, checksum, cachingStrategy,
byteArrayManage, true, null, null);
this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum,
cachingStrategy, byteArrayManage, true, null, null);
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
}
@ -1288,7 +1321,7 @@ class DataStreamer extends Daemon {
LocatedBlock lb;
//get a new datanode
lb = dfsClient.namenode.getAdditionalDatanode(
src, stat.getFileId(), block, nodes, storageIDs,
src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs,
exclude.toArray(new DatanodeInfo[exclude.size()]),
1, dfsClient.clientName);
// a new node was allocated by the namenode. Update nodes.
@ -1400,7 +1433,7 @@ class DataStreamer extends Daemon {
} // while
if (success) {
block = updatePipeline(newGS);
updatePipeline(newGS);
}
return false; // do not sleep, continue processing
}
@ -1497,17 +1530,27 @@ class DataStreamer extends Daemon {
}
private LocatedBlock updateBlockForPipeline() throws IOException {
return dfsClient.namenode.updateBlockForPipeline(block,
return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(),
dfsClient.clientName);
}
void updateBlockGS(final long newGS) {
block.setGenerationStamp(newGS);
}
/** update pipeline at the namenode */
ExtendedBlock updatePipeline(long newGS) throws IOException {
final ExtendedBlock newBlock = new ExtendedBlock(
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
nodes, storageIDs);
return newBlock;
private void updatePipeline(long newGS) throws IOException {
final ExtendedBlock oldBlock = block.getCurrentBlock();
// the new GS has been propagated to all DN, it should be ok to update the
// local block state
updateBlockGS(newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
block.getCurrentBlock(), nodes, storageIDs);
}
DatanodeInfo[] getExcludedNodes() {
return excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
.keySet().toArray(new DatanodeInfo[0]);
}
/**
@ -1522,35 +1565,30 @@ class DataStreamer extends Daemon {
StorageType[] storageTypes;
int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success;
ExtendedBlock oldBlock = block;
final ExtendedBlock oldBlock = block.getCurrentBlock();
do {
errorState.reset();
lastException.clear();
success = false;
DatanodeInfo[] excluded =
excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
.keySet()
.toArray(new DatanodeInfo[0]);
block = oldBlock;
lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
block = lb.getBlock();
DatanodeInfo[] excluded = getExcludedNodes();
lb = locateFollowingBlock(
excluded.length > 0 ? excluded : null, oldBlock);
block.setCurrentBlock(lb.getBlock());
block.setNumBytes(0);
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
storageTypes = lb.getStorageTypes();
//
// Connect to first DataNode in the list.
//
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) {
LOG.warn("Abandoning " + block);
dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
dfsClient.clientName);
block = null;
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
stat.getFileId(), src, dfsClient.clientName);
block.setCurrentBlock(null);
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
@ -1611,7 +1649,7 @@ class DataStreamer extends Daemon {
// We cannot change the block length in 'block' as it counts the number
// of bytes ack'ed.
ExtendedBlock blockCopy = new ExtendedBlock(block);
ExtendedBlock blockCopy = block.getCurrentBlock();
blockCopy.setNumBytes(stat.getBlockSize());
boolean[] targetPinnings = getPinnings(nodes);
@ -1721,8 +1759,8 @@ class DataStreamer extends Daemon {
}
}
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException {
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
ExtendedBlock oldBlock) throws IOException {
final DfsClientConf conf = dfsClient.getConf();
int retries = conf.getNumBlockWriteLocateFollowingRetry();
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
@ -1731,7 +1769,7 @@ class DataStreamer extends Daemon {
while (true) {
try {
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
block, excludedNodes, stat.getFileId(), favoredNodes,
oldBlock, excluded, stat.getFileId(), favoredNodes,
addBlockFlags);
} catch (RemoteException e) {
IOException ue =
@ -1816,7 +1854,7 @@ class DataStreamer extends Daemon {
* @return the block this streamer is writing to
*/
ExtendedBlock getBlock() {
return block;
return block.getCurrentBlock();
}
/**
@ -2009,7 +2047,8 @@ class DataStreamer extends Daemon {
@Override
public String toString() {
return (block == null? null: block.getLocalBlock())
final ExtendedBlock extendedBlock = block.getCurrentBlock();
return (extendedBlock == null ? null : extendedBlock.getLocalBlock())
+ "@" + Arrays.toString(getNodes());
}
}

View File

@ -110,8 +110,7 @@ public class TestDFSOutputStream {
* packet size < 64kB. See HDFS-7308 for details.
*/
@Test
public void testComputePacketChunkSize()
throws Exception {
public void testComputePacketChunkSize() throws Exception {
DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream os = fs.create(new Path("/test"));
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,