HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao.
This commit is contained in:
parent
29fe5af017
commit
f3cdf29af4
|
@ -606,9 +606,9 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
// update the block length first time irrespective of flag
|
// update the block length first time irrespective of flag
|
||||||
if (updateLength || getStreamer().getPersistBlocks().get()) {
|
if (updateLength || getStreamer().getPersistBlocks().get()) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!getStreamer().streamerClosed()
|
final ExtendedBlock block = getStreamer().getBlock();
|
||||||
&& getStreamer().getBlock() != null) {
|
if (!getStreamer().streamerClosed() && block != null) {
|
||||||
lastBlockLength = getStreamer().getBlock().getNumBytes();
|
lastBlockLength = block.getNumBytes();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,8 +150,6 @@ class DataStreamer extends Daemon {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Record a connection exception.
|
* Record a connection exception.
|
||||||
* @param e
|
|
||||||
* @throws InvalidEncryptionKeyException
|
|
||||||
*/
|
*/
|
||||||
void recordFailure(final InvalidEncryptionKeyException e)
|
void recordFailure(final InvalidEncryptionKeyException e)
|
||||||
throws InvalidEncryptionKeyException {
|
throws InvalidEncryptionKeyException {
|
||||||
|
@ -186,9 +184,8 @@ class DataStreamer extends Daemon {
|
||||||
final StorageType[] targetStorageTypes,
|
final StorageType[] targetStorageTypes,
|
||||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||||
//send the TRANSFER_BLOCK request
|
//send the TRANSFER_BLOCK request
|
||||||
new Sender(out)
|
new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
|
||||||
.transferBlock(block, blockToken, dfsClient.clientName, targets,
|
dfsClient.clientName, targets, targetStorageTypes);
|
||||||
targetStorageTypes);
|
|
||||||
out.flush();
|
out.flush();
|
||||||
//ack
|
//ack
|
||||||
BlockOpResponseProto transferResponse = BlockOpResponseProto
|
BlockOpResponseProto transferResponse = BlockOpResponseProto
|
||||||
|
@ -207,6 +204,42 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class BlockToWrite {
|
||||||
|
private ExtendedBlock currentBlock;
|
||||||
|
|
||||||
|
BlockToWrite(ExtendedBlock block) {
|
||||||
|
setCurrentBlock(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized ExtendedBlock getCurrentBlock() {
|
||||||
|
return currentBlock == null ? null : new ExtendedBlock(currentBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized long getNumBytes() {
|
||||||
|
return currentBlock == null ? 0 : currentBlock.getNumBytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void setCurrentBlock(ExtendedBlock block) {
|
||||||
|
currentBlock = (block == null || block.getLocalBlock() == null) ?
|
||||||
|
null : new ExtendedBlock(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void setNumBytes(long numBytes) {
|
||||||
|
assert currentBlock != null;
|
||||||
|
currentBlock.setNumBytes(numBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void setGenerationStamp(long generationStamp) {
|
||||||
|
assert currentBlock != null;
|
||||||
|
currentBlock.setGenerationStamp(generationStamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized String toString() {
|
||||||
|
return currentBlock == null ? "null" : currentBlock.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a socket for a write pipeline
|
* Create a socket for a write pipeline
|
||||||
*
|
*
|
||||||
|
@ -420,7 +453,7 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
|
|
||||||
private volatile boolean streamerClosed = false;
|
private volatile boolean streamerClosed = false;
|
||||||
private volatile ExtendedBlock block; // its length is number of bytes acked
|
private final BlockToWrite block; // its length is number of bytes acked
|
||||||
private Token<BlockTokenIdentifier> accessToken;
|
private Token<BlockTokenIdentifier> accessToken;
|
||||||
private DataOutputStream blockStream;
|
private DataOutputStream blockStream;
|
||||||
private DataInputStream blockReplyStream;
|
private DataInputStream blockReplyStream;
|
||||||
|
@ -481,12 +514,14 @@ class DataStreamer extends Daemon {
|
||||||
private final String[] favoredNodes;
|
private final String[] favoredNodes;
|
||||||
private final EnumSet<AddBlockFlag> addBlockFlags;
|
private final EnumSet<AddBlockFlag> addBlockFlags;
|
||||||
|
|
||||||
private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
|
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
|
||||||
|
DFSClient dfsClient, String src,
|
||||||
Progressable progress, DataChecksum checksum,
|
Progressable progress, DataChecksum checksum,
|
||||||
AtomicReference<CachingStrategy> cachingStrategy,
|
AtomicReference<CachingStrategy> cachingStrategy,
|
||||||
ByteArrayManager byteArrayManage,
|
ByteArrayManager byteArrayManage,
|
||||||
boolean isAppend, String[] favoredNodes,
|
boolean isAppend, String[] favoredNodes,
|
||||||
EnumSet<AddBlockFlag> flags) {
|
EnumSet<AddBlockFlag> flags) {
|
||||||
|
this.block = new BlockToWrite(block);
|
||||||
this.dfsClient = dfsClient;
|
this.dfsClient = dfsClient;
|
||||||
this.src = src;
|
this.src = src;
|
||||||
this.progress = progress;
|
this.progress = progress;
|
||||||
|
@ -512,9 +547,8 @@ class DataStreamer extends Daemon {
|
||||||
AtomicReference<CachingStrategy> cachingStrategy,
|
AtomicReference<CachingStrategy> cachingStrategy,
|
||||||
ByteArrayManager byteArrayManage, String[] favoredNodes,
|
ByteArrayManager byteArrayManage, String[] favoredNodes,
|
||||||
EnumSet<AddBlockFlag> flags) {
|
EnumSet<AddBlockFlag> flags) {
|
||||||
this(stat, dfsClient, src, progress, checksum, cachingStrategy,
|
this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
|
||||||
byteArrayManage, false, favoredNodes, flags);
|
byteArrayManage, false, favoredNodes, flags);
|
||||||
this.block = block;
|
|
||||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,10 +561,9 @@ class DataStreamer extends Daemon {
|
||||||
String src, Progressable progress, DataChecksum checksum,
|
String src, Progressable progress, DataChecksum checksum,
|
||||||
AtomicReference<CachingStrategy> cachingStrategy,
|
AtomicReference<CachingStrategy> cachingStrategy,
|
||||||
ByteArrayManager byteArrayManage) throws IOException {
|
ByteArrayManager byteArrayManage) throws IOException {
|
||||||
this(stat, dfsClient, src, progress, checksum, cachingStrategy,
|
this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum,
|
||||||
byteArrayManage, true, null, null);
|
cachingStrategy, byteArrayManage, true, null, null);
|
||||||
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
|
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
|
||||||
block = lastBlock.getBlock();
|
|
||||||
bytesSent = block.getNumBytes();
|
bytesSent = block.getNumBytes();
|
||||||
accessToken = lastBlock.getBlockToken();
|
accessToken = lastBlock.getBlockToken();
|
||||||
}
|
}
|
||||||
|
@ -1295,7 +1328,7 @@ class DataStreamer extends Daemon {
|
||||||
LocatedBlock lb;
|
LocatedBlock lb;
|
||||||
//get a new datanode
|
//get a new datanode
|
||||||
lb = dfsClient.namenode.getAdditionalDatanode(
|
lb = dfsClient.namenode.getAdditionalDatanode(
|
||||||
src, stat.getFileId(), block, nodes, storageIDs,
|
src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs,
|
||||||
exclude.toArray(new DatanodeInfo[exclude.size()]),
|
exclude.toArray(new DatanodeInfo[exclude.size()]),
|
||||||
1, dfsClient.clientName);
|
1, dfsClient.clientName);
|
||||||
// a new node was allocated by the namenode. Update nodes.
|
// a new node was allocated by the namenode. Update nodes.
|
||||||
|
@ -1407,7 +1440,7 @@ class DataStreamer extends Daemon {
|
||||||
} // while
|
} // while
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
block = updatePipeline(newGS);
|
updatePipeline(newGS);
|
||||||
}
|
}
|
||||||
return false; // do not sleep, continue processing
|
return false; // do not sleep, continue processing
|
||||||
}
|
}
|
||||||
|
@ -1504,17 +1537,27 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
|
|
||||||
private LocatedBlock updateBlockForPipeline() throws IOException {
|
private LocatedBlock updateBlockForPipeline() throws IOException {
|
||||||
return dfsClient.namenode.updateBlockForPipeline(block,
|
return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(),
|
||||||
dfsClient.clientName);
|
dfsClient.clientName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void updateBlockGS(final long newGS) {
|
||||||
|
block.setGenerationStamp(newGS);
|
||||||
|
}
|
||||||
|
|
||||||
/** update pipeline at the namenode */
|
/** update pipeline at the namenode */
|
||||||
ExtendedBlock updatePipeline(long newGS) throws IOException {
|
private void updatePipeline(long newGS) throws IOException {
|
||||||
final ExtendedBlock newBlock = new ExtendedBlock(
|
final ExtendedBlock oldBlock = block.getCurrentBlock();
|
||||||
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
|
// the new GS has been propagated to all DN, it should be ok to update the
|
||||||
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
|
// local block state
|
||||||
nodes, storageIDs);
|
updateBlockGS(newGS);
|
||||||
return newBlock;
|
dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
|
||||||
|
block.getCurrentBlock(), nodes, storageIDs);
|
||||||
|
}
|
||||||
|
|
||||||
|
DatanodeInfo[] getExcludedNodes() {
|
||||||
|
return excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
|
||||||
|
.keySet().toArray(new DatanodeInfo[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1529,35 +1572,30 @@ class DataStreamer extends Daemon {
|
||||||
StorageType[] storageTypes;
|
StorageType[] storageTypes;
|
||||||
int count = dfsClient.getConf().getNumBlockWriteRetry();
|
int count = dfsClient.getConf().getNumBlockWriteRetry();
|
||||||
boolean success;
|
boolean success;
|
||||||
ExtendedBlock oldBlock = block;
|
final ExtendedBlock oldBlock = block.getCurrentBlock();
|
||||||
do {
|
do {
|
||||||
errorState.reset();
|
errorState.reset();
|
||||||
lastException.clear();
|
lastException.clear();
|
||||||
success = false;
|
success = false;
|
||||||
|
|
||||||
DatanodeInfo[] excluded =
|
DatanodeInfo[] excluded = getExcludedNodes();
|
||||||
excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
|
lb = locateFollowingBlock(
|
||||||
.keySet()
|
excluded.length > 0 ? excluded : null, oldBlock);
|
||||||
.toArray(new DatanodeInfo[0]);
|
block.setCurrentBlock(lb.getBlock());
|
||||||
block = oldBlock;
|
|
||||||
lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
|
|
||||||
block = lb.getBlock();
|
|
||||||
block.setNumBytes(0);
|
block.setNumBytes(0);
|
||||||
bytesSent = 0;
|
bytesSent = 0;
|
||||||
accessToken = lb.getBlockToken();
|
accessToken = lb.getBlockToken();
|
||||||
nodes = lb.getLocations();
|
nodes = lb.getLocations();
|
||||||
storageTypes = lb.getStorageTypes();
|
storageTypes = lb.getStorageTypes();
|
||||||
|
|
||||||
//
|
|
||||||
// Connect to first DataNode in the list.
|
// Connect to first DataNode in the list.
|
||||||
//
|
|
||||||
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
|
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
|
||||||
|
|
||||||
if (!success) {
|
if (!success) {
|
||||||
LOG.warn("Abandoning " + block);
|
LOG.warn("Abandoning " + block);
|
||||||
dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
|
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
|
||||||
dfsClient.clientName);
|
stat.getFileId(), src, dfsClient.clientName);
|
||||||
block = null;
|
block.setCurrentBlock(null);
|
||||||
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
|
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
|
||||||
LOG.warn("Excluding datanode " + badNode);
|
LOG.warn("Excluding datanode " + badNode);
|
||||||
excludedNodes.put(badNode, badNode);
|
excludedNodes.put(badNode, badNode);
|
||||||
|
@ -1618,7 +1656,7 @@ class DataStreamer extends Daemon {
|
||||||
|
|
||||||
// We cannot change the block length in 'block' as it counts the number
|
// We cannot change the block length in 'block' as it counts the number
|
||||||
// of bytes ack'ed.
|
// of bytes ack'ed.
|
||||||
ExtendedBlock blockCopy = new ExtendedBlock(block);
|
ExtendedBlock blockCopy = block.getCurrentBlock();
|
||||||
blockCopy.setNumBytes(stat.getBlockSize());
|
blockCopy.setNumBytes(stat.getBlockSize());
|
||||||
|
|
||||||
boolean[] targetPinnings = getPinnings(nodes);
|
boolean[] targetPinnings = getPinnings(nodes);
|
||||||
|
@ -1728,8 +1766,8 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
|
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
|
||||||
throws IOException {
|
ExtendedBlock oldBlock) throws IOException {
|
||||||
final DfsClientConf conf = dfsClient.getConf();
|
final DfsClientConf conf = dfsClient.getConf();
|
||||||
int retries = conf.getNumBlockWriteLocateFollowingRetry();
|
int retries = conf.getNumBlockWriteLocateFollowingRetry();
|
||||||
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
|
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
|
||||||
|
@ -1738,7 +1776,7 @@ class DataStreamer extends Daemon {
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
|
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
|
||||||
block, excludedNodes, stat.getFileId(), favoredNodes,
|
oldBlock, excluded, stat.getFileId(), favoredNodes,
|
||||||
addBlockFlags);
|
addBlockFlags);
|
||||||
} catch (RemoteException e) {
|
} catch (RemoteException e) {
|
||||||
IOException ue =
|
IOException ue =
|
||||||
|
@ -1823,7 +1861,7 @@ class DataStreamer extends Daemon {
|
||||||
* @return the block this streamer is writing to
|
* @return the block this streamer is writing to
|
||||||
*/
|
*/
|
||||||
ExtendedBlock getBlock() {
|
ExtendedBlock getBlock() {
|
||||||
return block;
|
return block.getCurrentBlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2016,7 +2054,8 @@ class DataStreamer extends Daemon {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return (block == null? null: block.getLocalBlock())
|
final ExtendedBlock extendedBlock = block.getCurrentBlock();
|
||||||
|
return (extendedBlock == null ? null : extendedBlock.getLocalBlock())
|
||||||
+ "@" + Arrays.toString(getNodes());
|
+ "@" + Arrays.toString(getNodes());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,8 +110,7 @@ public class TestDFSOutputStream {
|
||||||
* packet size < 64kB. See HDFS-7308 for details.
|
* packet size < 64kB. See HDFS-7308 for details.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testComputePacketChunkSize()
|
public void testComputePacketChunkSize() throws Exception {
|
||||||
throws Exception {
|
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
FSDataOutputStream os = fs.create(new Path("/test"));
|
FSDataOutputStream os = fs.create(new Path("/test"));
|
||||||
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
|
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
|
||||||
|
|
Loading…
Reference in New Issue