HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE blocks.

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-01-26 10:32:51 +08:00
parent 2085e60a96
commit bd909ed9f2
16 changed files with 255 additions and 102 deletions

View File

@ -119,6 +119,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator; import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@ -160,10 +161,10 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RpcNoSuchMethodException; import org.apache.hadoop.ipc.RpcNoSuchMethodException;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -178,16 +179,15 @@ import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.net.InetAddresses; import com.google.common.net.InetAddresses;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/******************************************************** /********************************************************
* DFSClient can connect to a Hadoop Filesystem and * DFSClient can connect to a Hadoop Filesystem and
@ -1291,17 +1291,43 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
} }
/**
* Invoke namenode append RPC.
* It retries in case of {@link BlockNotYetCompleteException}.
*/
private LastBlockWithStatus callAppend(String src,
EnumSetWritable<CreateFlag> flag) throws IOException {
final long startTime = Time.monotonicNow();
for(;;) {
try {
return namenode.append(src, clientName, flag);
} catch(RemoteException re) {
if (Time.monotonicNow() - startTime > 5000
|| !RetriableException.class.getName().equals(
re.getClassName())) {
throw re;
}
try { // sleep and retry
Thread.sleep(500);
} catch (InterruptedException e) {
throw DFSUtilClient.toInterruptedIOException("callAppend", e);
}
}
}
}
/** Method to get stream returned by append call */ /** Method to get stream returned by append call */
private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag, private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag,
Progressable progress, String[] favoredNodes) throws IOException { Progressable progress, String[] favoredNodes) throws IOException {
CreateFlag.validateForAppend(flag); CreateFlag.validateForAppend(flag);
try { try {
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, final LastBlockWithStatus blkWithStatus = callAppend(src,
new EnumSetWritable<>(flag, CreateFlag.class)); new EnumSetWritable<>(flag, CreateFlag.class));
HdfsFileStatus status = blkWithStatus.getFileStatus(); HdfsFileStatus status = blkWithStatus.getFileStatus();
if (status == null) { if (status == null) {
DFSClient.LOG.debug("NameNode is on an older version, request file " + LOG.debug("NameNode is on an older version, request file " +
"info with additional RPC call for file: " + src); "info with additional RPC call for file: {}", src);
status = getFileInfo(src); status = getFileInfo(src);
} }
return DFSOutputStream.newStreamForAppend(this, src, flag, progress, return DFSOutputStream.newStreamForAppend(this, src, flag, progress,

View File

@ -475,8 +475,7 @@ class DataStreamer extends Daemon {
setPipeline(lastBlock); setPipeline(lastBlock);
if (nodes.length < 1) { if (nodes.length < 1) {
throw new IOException("Unable to retrieve blocks locations " + throw new IOException("Unable to retrieve blocks locations " +
" for last block " + block + " for last block " + block + " of file " + src);
"of file " + src);
} }
} }

View File

@ -1910,6 +1910,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9436. Make NNThroughputBenchmark$BlockReportStats run with 10 HDFS-9436. Make NNThroughputBenchmark$BlockReportStats run with 10
datanodes by default. (Mingliang Liu via shv) datanodes by default. (Mingliang Liu via shv)
HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE
blocks. (szetszwo)
BUG FIXES BUG FIXES
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs. HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.

View File

@ -203,6 +203,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_REPLICATION_MIN_KEY = public static final String DFS_NAMENODE_REPLICATION_MIN_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1; public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
public static final String DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY
= "dfs.namenode.file.close.num-committed-allowed";
public static final int DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT
= 0;
public static final String DFS_NAMENODE_STRIPE_MIN_KEY = "dfs.namenode.stripe.min"; public static final String DFS_NAMENODE_STRIPE_MIN_KEY = "dfs.namenode.stripe.min";
public static final int DFS_NAMENODE_STRIPE_MIN_DEFAULT = 1; public static final int DFS_NAMENODE_STRIPE_MIN_DEFAULT = 1;
public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY = public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY =

View File

@ -641,6 +641,10 @@ public class BlockManager implements BlockStatsMXBean {
} }
} }
public short getMinReplication() {
return minReplication;
}
public short getMinStorageNum(BlockInfo block) { public short getMinStorageNum(BlockInfo block) {
if (block.isStriped()) { if (block.isStriped()) {
return ((BlockInfoStriped) block).getRealDataBlockNum(); return ((BlockInfoStriped) block).getRealDataBlockNum();
@ -703,8 +707,8 @@ public class BlockManager implements BlockStatsMXBean {
final boolean b = commitBlock(lastBlock, commitBlock); final boolean b = commitBlock(lastBlock, commitBlock);
if (hasMinStorage(lastBlock)) { if (hasMinStorage(lastBlock)) {
if (b && !bc.isStriped()) { if (b) {
addExpectedReplicasToPending(lastBlock); addExpectedReplicasToPending(lastBlock, bc);
} }
completeBlock(lastBlock, false); completeBlock(lastBlock, false);
} }
@ -716,6 +720,12 @@ public class BlockManager implements BlockStatsMXBean {
* pendingReplications in order to keep ReplicationMonitor from scheduling * pendingReplications in order to keep ReplicationMonitor from scheduling
* the block. * the block.
*/ */
public void addExpectedReplicasToPending(BlockInfo blk, BlockCollection bc) {
if (!bc.isStriped()) {
addExpectedReplicasToPending(blk);
}
}
private void addExpectedReplicasToPending(BlockInfo lastBlock) { private void addExpectedReplicasToPending(BlockInfo lastBlock) {
DatanodeStorageInfo[] expectedStorages = DatanodeStorageInfo[] expectedStorages =
lastBlock.getUnderConstructionFeature().getExpectedStorageLocations(); lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
@ -2844,9 +2854,7 @@ public class BlockManager implements BlockStatsMXBean {
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
hasMinStorage(storedBlock, numLiveReplicas)) { hasMinStorage(storedBlock, numLiveReplicas)) {
if (!bc.isStriped()) { addExpectedReplicasToPending(storedBlock, bc);
addExpectedReplicasToPending(storedBlock);
}
completeBlock(storedBlock, false); completeBlock(storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block // check whether safe replication is reached for the block
@ -3825,26 +3833,6 @@ public class BlockManager implements BlockStatsMXBean {
} }
} }
/**
* Check that the indicated blocks are present and
* replicated.
*/
public boolean checkBlocksProperlyReplicated(
String src, BlockInfo[] blocks) {
for (BlockInfo b: blocks) {
if (!b.isComplete()) {
final int numNodes = b.numNodes();
final int min = getMinStorageNum(b);
final BlockUCState state = b.getBlockUCState();
LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + state
+ ", replication# = " + numNodes + (numNodes < min ? " < " : " >= ")
+ " minimum = " + min + ") in file " + src);
return false;
}
}
return true;
}
/** /**
* @return 0 if the block is not found; * @return 0 if the block is not found;
* otherwise, return the replication factor of the block. * otherwise, return the replication factor of the block.

View File

@ -33,8 +33,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
import org.apache.hadoop.ipc.RetriableException;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -126,11 +128,18 @@ final class FSDirAppendOp {
final BlockInfo lastBlock = file.getLastBlock(); final BlockInfo lastBlock = file.getLastBlock();
// Check that the block has at least minimum replication. // Check that the block has at least minimum replication.
if (lastBlock != null && lastBlock.isComplete() if (lastBlock != null) {
if (lastBlock.getBlockUCState() == BlockUCState.COMMITTED) {
throw new RetriableException(
new NotReplicatedYetException("append: lastBlock="
+ lastBlock + " of src=" + path
+ " is COMMITTED but not yet COMPLETE."));
} else if (lastBlock.isComplete()
&& !blockManager.isSufficientlyReplicated(lastBlock)) { && !blockManager.isSufficientlyReplicated(lastBlock)) {
throw new IOException("append: lastBlock=" + lastBlock + " of src=" throw new IOException("append: lastBlock=" + lastBlock + " of src="
+ path + " is not sufficiently replicated yet."); + path + " is not sufficiently replicated yet.");
} }
}
lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock, lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
true, logRetryCache); true, logRetryCache);
} catch (IOException ie) { } catch (IOException ie) {

View File

@ -789,8 +789,10 @@ class FSDirWriteFileOp {
return false; return false;
} }
fsn.addCommittedBlocksToPending(pendingFile);
fsn.finalizeINodeFileUnderConstruction(src, pendingFile, fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.CURRENT_STATE_ID); Snapshot.CURRENT_STATE_ID, true);
return true; return true;
} }

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade; import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
@ -29,7 +28,6 @@ import java.util.EnumMap;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -94,6 +92,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@ -457,8 +456,9 @@ public class FSEditLogLoader {
// One might expect that you could use removeLease(holder, path) here, // One might expect that you could use removeLease(holder, path) here,
// but OP_CLOSE doesn't serialize the holder. So, remove the inode. // but OP_CLOSE doesn't serialize the holder. So, remove the inode.
if (file.isUnderConstruction()) { if (file.isUnderConstruction()) {
fsNamesys.leaseManager.removeLeases(Lists.newArrayList(file.getId())); fsNamesys.getLeaseManager().removeLease(file.getId());
file.toCompleteFile(file.getModificationTime()); file.toCompleteFile(file.getModificationTime(), 0,
fsNamesys.getBlockManager().getMinReplication());
} }
break; break;
} }

View File

@ -459,6 +459,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final long minBlockSize; // minimum block size private final long minBlockSize; // minimum block size
final long maxBlocksPerFile; // maximum # of blocks per file final long maxBlocksPerFile; // maximum # of blocks per file
private final int numCommittedAllowed;
/** Lock to protect FSNamesystem. */ /** Lock to protect FSNamesystem. */
private final FSNamesystemLock fsLock; private final FSNamesystemLock fsLock;
@ -756,6 +757,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT); DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY, this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT); DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
this.numCommittedAllowed = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
@ -2594,17 +2598,36 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean checkFileProgress(String src, INodeFile v, boolean checkall) { boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
assert hasReadLock(); assert hasReadLock();
if (checkall) { if (checkall) {
return blockManager.checkBlocksProperlyReplicated(src, v return checkBlocksComplete(src, true, v.getBlocks());
.getBlocks());
} else { } else {
// check the penultimate block of this file final BlockInfo[] blocks = v.getBlocks();
BlockInfo b = v.getPenultimateBlock(); final int i = blocks.length - numCommittedAllowed - 2;
return b == null || return i < 0 || blocks[i] == null
blockManager.checkBlocksProperlyReplicated( || checkBlocksComplete(src, false, blocks[i]);
src, new BlockInfo[] { b });
} }
} }
/**
* Check if the blocks are COMPLETE;
* it may allow the last block to be COMMITTED.
*/
private boolean checkBlocksComplete(String src, boolean allowCommittedBlock,
BlockInfo... blocks) {
final int n = allowCommittedBlock? numCommittedAllowed: 0;
for(int i = 0; i < blocks.length; i++) {
final short min = blockManager.getMinStorageNum(blocks[i]);
final String err = INodeFile.checkBlockComplete(blocks, i, n, min);
if (err != null) {
final int numNodes = blocks[i].numNodes();
LOG.info("BLOCK* " + err + "(numNodes= " + numNodes
+ (numNodes < min ? " < " : " >= ")
+ " minimum = " + min + ") in file " + src);
return false;
}
}
return true;
}
/** /**
* Change the indicated filename. * Change the indicated filename.
* @deprecated Use {@link #renameTo(String, String, boolean, * @deprecated Use {@link #renameTo(String, String, boolean,
@ -2735,7 +2758,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
List<INode> removedINodes, List<INode> removedINodes,
final boolean acquireINodeMapLock) { final boolean acquireINodeMapLock) {
assert hasWriteLock(); assert hasWriteLock();
leaseManager.removeLeases(removedUCFiles); for(long i : removedUCFiles) {
leaseManager.removeLease(i);
}
// remove inodes from inodesMap // remove inodes from inodesMap
if (removedINodes != null) { if (removedINodes != null) {
if (acquireINodeMapLock) { if (acquireINodeMapLock) {
@ -2994,7 +3019,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// then reap lease immediately and close the file. // then reap lease immediately and close the file.
if(nrCompleteBlocks == nrBlocks) { if(nrCompleteBlocks == nrBlocks) {
finalizeINodeFileUnderConstruction(src, pendingFile, finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId()); iip.getLatestSnapshotId(), false);
NameNode.stateChangeLog.warn("BLOCK*" NameNode.stateChangeLog.warn("BLOCK*"
+ " internalReleaseLease: All existing blocks are COMPLETE," + " internalReleaseLease: All existing blocks are COMPLETE,"
+ " lease removed, file closed."); + " lease removed, file closed.");
@ -3033,7 +3058,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if(penultimateBlockMinStorage && if(penultimateBlockMinStorage &&
blockManager.hasMinStorage(lastBlock)) { blockManager.hasMinStorage(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile, finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId()); iip.getLatestSnapshotId(), false);
NameNode.stateChangeLog.warn("BLOCK*" NameNode.stateChangeLog.warn("BLOCK*"
+ " internalReleaseLease: Committed blocks are minimally replicated," + " internalReleaseLease: Committed blocks are minimally replicated,"
+ " lease removed, file closed."); + " lease removed, file closed.");
@ -3077,7 +3102,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// We can remove this block and close the file. // We can remove this block and close the file.
pendingFile.removeLastBlock(lastBlock); pendingFile.removeLastBlock(lastBlock);
finalizeINodeFileUnderConstruction(src, pendingFile, finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId()); iip.getLatestSnapshotId(), false);
NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: " NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
+ "Removed empty last block and closed file."); + "Removed empty last block and closed file.");
return true; return true;
@ -3163,8 +3188,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
} }
void finalizeINodeFileUnderConstruction( void addCommittedBlocksToPending(final INodeFile pendingFile) {
String src, INodeFile pendingFile, int latestSnapshot) throws IOException { final BlockInfo[] blocks = pendingFile.getBlocks();
int i = blocks.length - numCommittedAllowed;
if (i < 0) {
i = 0;
}
for(; i < blocks.length; i++) {
final BlockInfo b = blocks[i];
if (b != null && b.getBlockUCState() == BlockUCState.COMMITTED) {
// b is COMMITTED but not yet COMPLETE, add it to pending replication.
blockManager.addExpectedReplicasToPending(b, pendingFile);
}
}
}
void finalizeINodeFileUnderConstruction(String src, INodeFile pendingFile,
int latestSnapshot, boolean allowCommittedBlock) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature(); FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
@ -3179,7 +3219,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// The file is no longer pending. // The file is no longer pending.
// Create permanent INode, update blocks. No need to replace the inode here // Create permanent INode, update blocks. No need to replace the inode here
// since we just remove the uc feature from pendingFile // since we just remove the uc feature from pendingFile
pendingFile.toCompleteFile(now()); pendingFile.toCompleteFile(now(),
allowCommittedBlock? numCommittedAllowed: 0,
blockManager.getMinReplication());
// close file and persist block allocations for this file // close file and persist block allocations for this file
closeFile(src, pendingFile); closeFile(src, pendingFile);
@ -3412,8 +3454,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
commitOrCompleteLastBlock(pendingFile, iip, storedBlock); commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
//remove lease, close file //remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile, int s = Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID);
Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID)); finalizeINodeFileUnderConstruction(src, pendingFile, s, false);
} }
/** /**

View File

@ -224,30 +224,58 @@ public class INodeFile extends INodeWithAdditionalFields
* Convert the file to a complete file, i.e., to remove the Under-Construction * Convert the file to a complete file, i.e., to remove the Under-Construction
* feature. * feature.
*/ */
public INodeFile toCompleteFile(long mtime) { void toCompleteFile(long mtime, int numCommittedAllowed, short minReplication) {
Preconditions.checkState(isUnderConstruction(), final FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
"file is no longer under construction"); Preconditions.checkNotNull(uc, "File %s is not under construction", this);
FileUnderConstructionFeature uc = getFileUnderConstructionFeature(); assertAllBlocksComplete(numCommittedAllowed, minReplication);
if (uc != null) {
assertAllBlocksComplete();
removeFeature(uc); removeFeature(uc);
this.setModificationTime(mtime); setModificationTime(mtime);
}
return this;
} }
/** Assert all blocks are complete. */ /** Assert all blocks are complete. */
private void assertAllBlocksComplete() { private void assertAllBlocksComplete(int numCommittedAllowed,
short minReplication) {
if (blocks == null) { if (blocks == null) {
return; return;
} }
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize" final String err = checkBlockComplete(blocks, i, numCommittedAllowed,
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.", minReplication);
getClass().getSimpleName(), this, i, Arrays.asList(blocks)); Preconditions.checkState(err == null,
"Unexpected block state: %s, file=%s (%s), blocks=%s (i=%s)",
err, this, getClass().getSimpleName(), Arrays.asList(blocks), i);
} }
} }
/**
* Check if the i-th block is COMPLETE;
* when the i-th block is the last block, it may be allowed to be COMMITTED.
*
* @return null if the block passes the check;
* otherwise, return an error message.
*/
static String checkBlockComplete(BlockInfo[] blocks, int i,
int numCommittedAllowed, short minReplication) {
final BlockInfo b = blocks[i];
final BlockUCState state = b.getBlockUCState();
if (state == BlockUCState.COMPLETE) {
return null;
}
if (b.isStriped() || i < blocks.length - numCommittedAllowed) {
return b + " is " + state + " but not COMPLETE";
}
if (state != BlockUCState.COMMITTED) {
return b + " is " + state + " but neither COMPLETE nor COMMITTED";
}
final int numExpectedLocations
= b.getUnderConstructionFeature().getNumExpectedLocations();
if (numExpectedLocations <= minReplication) {
return b + " is " + state + " but numExpectedLocations = "
+ numExpectedLocations + " <= minReplication = " + minReplication;
}
return null;
}
@Override // BlockCollection @Override // BlockCollection
public void setBlock(int index, BlockInfo blk) { public void setBlock(int index, BlockInfo blk) {
Preconditions.checkArgument(blk.isStriped() == this.isStriped()); Preconditions.checkArgument(blk.isStriped() == this.isStriped());

View File

@ -160,6 +160,13 @@ public class LeaseManager {
return lease; return lease;
} }
synchronized void removeLease(long inodeId) {
final Lease lease = leasesById.get(inodeId);
if (lease != null) {
removeLease(lease, inodeId);
}
}
/** /**
* Remove the specified lease and src. * Remove the specified lease and src.
*/ */
@ -298,16 +305,6 @@ public class LeaseManager {
} }
} }
@VisibleForTesting
synchronized void removeLeases(Collection<Long> inodes) {
for (long inode : inodes) {
Lease lease = leasesById.get(inode);
if (lease != null) {
removeLease(lease, inode);
}
}
}
public void setLeasePeriod(long softLimit, long hardLimit) { public void setLeasePeriod(long softLimit, long hardLimit) {
this.softLimit = softLimit; this.softLimit = softLimit;
this.hardLimit = hardLimit; this.hardLimit = hardLimit;

View File

@ -27,10 +27,12 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.HardLink;
@ -41,12 +43,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -55,6 +57,8 @@ import org.junit.Test;
* support HDFS appends. * support HDFS appends.
*/ */
public class TestFileAppend{ public class TestFileAppend{
private static final long RANDOM_TEST_RUNTIME = 10000;
final boolean simulatedStorage = false; final boolean simulatedStorage = false;
private static byte[] fileContents = null; private static byte[] fileContents = null;
@ -381,6 +385,56 @@ public class TestFileAppend{
} }
} }
@Test
public void testMultipleAppends() throws Exception {
final long startTime = Time.monotonicNow();
final Configuration conf = new HdfsConfiguration();
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(4).build();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
final Path p = new Path("/testMultipleAppend/foo");
final int blockSize = 1 << 16;
final byte[] data = AppendTestUtil.initBuffer(blockSize);
// create an empty file.
fs.create(p, true, 4096, (short)3, blockSize).close();
int fileLen = 0;
for(int i = 0;
i < 10 || Time.monotonicNow() - startTime < RANDOM_TEST_RUNTIME;
i++) {
int appendLen = ThreadLocalRandom.current().nextInt(100) + 1;
if (fileLen + appendLen > data.length) {
break;
}
AppendTestUtil.LOG.info(i + ") fileLen=" + fileLen
+ ", appendLen=" + appendLen);
final FSDataOutputStream out = fs.append(p);
out.write(data, fileLen, appendLen);
out.close();
fileLen += appendLen;
}
Assert.assertEquals(fileLen, fs.getFileStatus(p).getLen());
final byte[] actual = new byte[fileLen];
final FSDataInputStream in = fs.open(p);
in.readFully(actual);
in.close();
for(int i = 0; i < fileLen; i++) {
Assert.assertEquals(data[i], actual[i]);
}
} finally {
fs.close();
cluster.shutdown();
}
}
/** Tests appending after soft-limit expires. */ /** Tests appending after soft-limit expires. */
@Test @Test
public void testAppendAfterSoftLimit() public void testAppendAfterSoftLimit()

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -42,20 +41,19 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
@ -473,7 +471,7 @@ public class TestFSEditLogLoader {
file.toUnderConstruction(clientName, clientMachine); file.toUnderConstruction(clientName, clientMachine);
file.addBlock(stripedBlk); file.addBlock(stripedBlk);
fns.getEditLog().logAddBlock(testFilePath, file); fns.getEditLog().logAddBlock(testFilePath, file);
file.toCompleteFile(System.currentTimeMillis()); TestINodeFile.toCompleteFile(file);
//If the block by loaded is the same as above it means that //If the block by loaded is the same as above it means that
//we have successfully applied the edit log to the fsimage. //we have successfully applied the edit log to the fsimage.
@ -539,7 +537,7 @@ public class TestFSEditLogLoader {
file.toUnderConstruction(clientName, clientMachine); file.toUnderConstruction(clientName, clientMachine);
file.addBlock(stripedBlk); file.addBlock(stripedBlk);
fns.getEditLog().logAddBlock(testFilePath, file); fns.getEditLog().logAddBlock(testFilePath, file);
file.toCompleteFile(System.currentTimeMillis()); TestINodeFile.toCompleteFile(file);
fns.enterSafeMode(false); fns.enterSafeMode(false);
fns.saveNamespace(0, 0); fns.saveNamespace(0, 0);
fns.leaveSafeMode(false); fns.leaveSafeMode(false);
@ -551,7 +549,7 @@ public class TestFSEditLogLoader {
file.getLastBlock().setNumBytes(newBlkNumBytes); file.getLastBlock().setNumBytes(newBlkNumBytes);
file.getLastBlock().setGenerationStamp(newTimestamp); file.getLastBlock().setGenerationStamp(newTimestamp);
fns.getEditLog().logUpdateBlocks(testFilePath, file, true); fns.getEditLog().logUpdateBlocks(testFilePath, file, true);
file.toCompleteFile(System.currentTimeMillis()); TestINodeFile.toCompleteFile(file);
//After the namenode restarts if the block by loaded is the same as above //After the namenode restarts if the block by loaded is the same as above
//(new block size and timestamp) it means that we have successfully //(new block size and timestamp) it means that we have successfully
@ -616,7 +614,7 @@ public class TestFSEditLogLoader {
file.toUnderConstruction(clientName, clientMachine); file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk); file.addBlock(cBlk);
fns.getEditLog().logAddBlock(testFilePath, file); fns.getEditLog().logAddBlock(testFilePath, file);
file.toCompleteFile(System.currentTimeMillis()); TestINodeFile.toCompleteFile(file);
cluster.restartNameNodes(); cluster.restartNameNodes();
cluster.waitActive(); cluster.waitActive();
fns = cluster.getNamesystem(); fns = cluster.getNamesystem();
@ -662,7 +660,7 @@ public class TestFSEditLogLoader {
INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath);
file.toUnderConstruction(clientName, clientMachine); file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk); file.addBlock(cBlk);
file.toCompleteFile(System.currentTimeMillis()); TestINodeFile.toCompleteFile(file);
long newBlkNumBytes = 1024*8; long newBlkNumBytes = 1024*8;
long newTimestamp = 1426222918+3600; long newTimestamp = 1426222918+3600;
@ -671,7 +669,7 @@ public class TestFSEditLogLoader {
file.getLastBlock().setNumBytes(newBlkNumBytes); file.getLastBlock().setNumBytes(newBlkNumBytes);
file.getLastBlock().setGenerationStamp(newTimestamp); file.getLastBlock().setGenerationStamp(newTimestamp);
fns.getEditLog().logUpdateBlocks(testFilePath, file, true); fns.getEditLog().logUpdateBlocks(testFilePath, file, true);
file.toCompleteFile(System.currentTimeMillis()); TestINodeFile.toCompleteFile(file);
cluster.restartNameNodes(); cluster.restartNameNodes();
cluster.waitActive(); cluster.waitActive();
fns = cluster.getNamesystem(); fns = cluster.getNamesystem();
@ -685,5 +683,4 @@ public class TestFSEditLogLoader {
} }
} }
} }
} }

View File

@ -509,7 +509,7 @@ public class TestFSImage {
INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath);
file.toUnderConstruction(clientName, clientMachine); file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk); file.addBlock(cBlk);
file.toCompleteFile(System.currentTimeMillis()); TestINodeFile.toCompleteFile(file);
fns.enterSafeMode(false); fns.enterSafeMode(false);
fns.saveNamespace(0, 0); fns.saveNamespace(0, 0);
cluster.restartNameNodes(); cluster.restartNameNodes();
@ -617,7 +617,7 @@ public class TestFSImage {
INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath);
file.toUnderConstruction(clientName, clientMachine); file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk); file.addBlock(cBlk);
file.toCompleteFile(System.currentTimeMillis()); TestINodeFile.toCompleteFile(file);
fs.createSnapshot(d,"testHasNonEcBlockUsingStripeID"); fs.createSnapshot(d,"testHasNonEcBlockUsingStripeID");
fs.truncate(p,0); fs.truncate(p,0);

View File

@ -94,6 +94,10 @@ public class TestINodeFile {
(short)3, 1024L); (short)3, 1024L);
} }
static void toCompleteFile(INodeFile file) {
file.toCompleteFile(Time.now(), 0, (short)1);
}
INodeFile createINodeFile(short replication, long preferredBlockSize) { INodeFile createINodeFile(short replication, long preferredBlockSize) {
return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
null, replication, preferredBlockSize); null, replication, preferredBlockSize);
@ -1130,7 +1134,7 @@ public class TestINodeFile {
assertEquals(clientName, uc.getClientName()); assertEquals(clientName, uc.getClientName());
assertEquals(clientMachine, uc.getClientMachine()); assertEquals(clientMachine, uc.getClientMachine());
file.toCompleteFile(Time.now()); toCompleteFile(file);
assertFalse(file.isUnderConstruction()); assertFalse(file.isUnderConstruction());
} }

View File

@ -51,8 +51,8 @@ public class TestLeaseManager {
} }
assertEquals(4, lm.getINodeIdWithLeases().size()); assertEquals(4, lm.getINodeIdWithLeases().size());
synchronized (lm) { for (long id : ids) {
lm.removeLeases(ids); lm.removeLease(id);
} }
assertEquals(0, lm.getINodeIdWithLeases().size()); assertEquals(0, lm.getINodeIdWithLeases().size());
} }