HDFS-7056. Snapshot support for truncate. Contributed by Konstantin Shvachko and Plamen Jeliazkov.

This commit is contained in:
Konstantin V Shvachko 2015-01-13 00:24:23 -08:00
parent 7e9358feb3
commit 08ac06283a
39 changed files with 1350 additions and 391 deletions

View File

@ -20,6 +20,8 @@ Trunk (Unreleased)
HDFS-3107. Introduce truncate. (Plamen Jeliazkov via shv)
HDFS-7056. Snapshot support for truncate. (Plamen Jeliazkov and shv)
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.

View File

@ -537,7 +537,7 @@ public void rename2(String src, String dst, Options.Rename... options)
* @param src existing file
* @param newLength the target size
*
* @return true if and client does not need to wait for block recovery,
* @return true if client does not need to wait for block recovery,
* false if client needs to wait for block recovery.
*
* @throws AccessControlException If access is denied

View File

@ -76,8 +76,8 @@ public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery(
final String storageID;
try {
storageID = impl.updateReplicaUnderRecovery(
PBHelper.convert(request.getBlock()),
request.getRecoveryId(), request.getNewLength());
PBHelper.convert(request.getBlock()), request.getRecoveryId(),
request.getNewBlockId(), request.getNewLength());
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -102,11 +102,12 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
@Override
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newLength) throws IOException {
long recoveryId, long newBlockId, long newLength) throws IOException {
UpdateReplicaUnderRecoveryRequestProto req =
UpdateReplicaUnderRecoveryRequestProto.newBuilder()
.setBlock(PBHelper.convert(oldBlock))
.setNewLength(newLength).setRecoveryId(recoveryId).build();
.setNewLength(newLength).setNewBlockId(newBlockId)
.setRecoveryId(recoveryId).build();
try {
return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req
).getStorageUuid();

View File

@ -607,16 +607,19 @@ public static RecoveringBlockProto convert(RecoveringBlock b) {
return null;
}
LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
return RecoveringBlockProto.newBuilder().setBlock(lb)
.setNewGenStamp(b.getNewGenerationStamp())
.setTruncateFlag(b.getTruncateFlag()).build();
RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder();
builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
if(b.getNewBlock() != null)
builder.setTruncateBlock(PBHelper.convert(b.getNewBlock()));
return builder.build();
}
public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = convert(b.getBlock().getB());
DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
return new RecoveringBlock(block, locs, b.getNewGenStamp(),
b.getTruncateFlag());
return (b.hasTruncateBlock()) ?
new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) :
new RecoveringBlock(block, locs, b.getNewGenStamp());
}
public static DatanodeInfoProto.AdminState convert(

View File

@ -54,6 +54,11 @@ public class BlockInfoUnderConstruction extends BlockInfo {
*/
private long blockRecoveryId = 0;
/**
* The block source to use in the event of copy-on-write truncate.
*/
private Block truncateBlock;
/**
* ReplicaUnderConstruction contains information about replicas while
* they are under construction.
@ -229,6 +234,15 @@ public long getBlockRecoveryId() {
return blockRecoveryId;
}
/** Get recover block */
public Block getTruncateBlock() {
return truncateBlock;
}
public void setTruncateBlock(Block recoveryBlock) {
this.truncateBlock = recoveryBlock;
}
/**
* Process the recorded replicas. When about to commit or finish the
* pipeline recovery sort out bad replicas.
@ -273,11 +287,7 @@ void commitBlock(Block block) throws IOException {
* make it primary.
*/
public void initializeBlockRecovery(long recoveryId) {
initializeBlockRecovery(BlockUCState.UNDER_RECOVERY, recoveryId);
}
public void initializeBlockRecovery(BlockUCState s, long recoveryId) {
setBlockUCState(s);
setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId;
if (replicas.size() == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*"

View File

@ -700,13 +700,14 @@ public BlockInfo forceCompleteBlock(final BlockCollection bc,
* The client is supposed to allocate a new block with the next call.
*
* @param bc file
* @param bytesToRemove num of bytes to remove from block
* @return the last block locations if the block is partial or null otherwise
*/
public LocatedBlock convertLastBlockToUnderConstruction(
BlockCollection bc) throws IOException {
BlockCollection bc, long bytesToRemove) throws IOException {
BlockInfo oldBlock = bc.getLastBlock();
if(oldBlock == null ||
bc.getPreferredBlockSize() == oldBlock.getNumBytes())
bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove)
return null;
assert oldBlock == getStoredBlock(oldBlock) :
"last block of the file is not in blocksMap";

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -1433,25 +1432,36 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
recoveryLocations.add(storages[i]);
}
}
// If we are performing a truncate recovery than set recovery fields
// to old block.
boolean truncateRecovery = b.getTruncateBlock() != null;
boolean copyOnTruncateRecovery = truncateRecovery &&
b.getTruncateBlock().getBlockId() != b.getBlockId();
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
new ExtendedBlock(blockPoolId, b.getTruncateBlock()) :
new ExtendedBlock(blockPoolId, b);
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
DatanodeInfo[] recoveryInfos;
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : " +
(storages.length - recoveryLocations.size()));
}
boolean isTruncate = b.getBlockUCState().equals(
HdfsServerConstants.BlockUCState.BEING_TRUNCATED);
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
b.getBlockRecoveryId(), isTruncate));
recoveryInfos =
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
DatanodeStorageInfo.toDatanodeInfos(storages),
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
}
if(truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b :
b.getTruncateBlock();
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
recoveryBlock));
} else {
brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
b.getBlockRecoveryId()));
}
}

View File

@ -299,13 +299,6 @@ static public enum BlockUCState {
* which synchronizes the existing replicas contents.
*/
UNDER_RECOVERY,
/**
* The block is being truncated.<br>
* When a file is truncated its last block may need to be truncated
* and needs to go through a recovery procedure,
* which synchronizes the existing replicas contents.
*/
BEING_TRUNCATED,
/**
* The block is committed.<br>
* The client reported that all bytes are written to data-nodes

View File

@ -2530,14 +2530,16 @@ private static ReplicaRecoveryInfo callInitReplicaRecovery(
*/
@Override // InterDatanodeProtocol
public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
final long recoveryId, final long newLength) throws IOException {
final long recoveryId, final long newBlockId, final long newLength)
throws IOException {
final String storageID = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength);
recoveryId, newBlockId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
// block locations until the next block report.
ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
newBlock.setGenerationStamp(recoveryId);
newBlock.setBlockId(newBlockId);
newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, "", storageID);
return storageID;
@ -2559,10 +2561,12 @@ static class BlockRecord {
this.rInfo = rInfo;
}
void updateReplicaUnderRecovery(String bpid, long recoveryId, long newLength
) throws IOException {
void updateReplicaUnderRecovery(String bpid, long recoveryId,
long newBlockId, long newLength)
throws IOException {
final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newLength);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId,
newLength);
}
@Override
@ -2646,6 +2650,10 @@ void syncBlock(RecoveringBlock rBlock,
getActiveNamenodeForBP(block.getBlockPoolId());
long recoveryId = rBlock.getNewGenerationStamp();
boolean isTruncateRecovery = rBlock.getNewBlock() != null;
long blockId = (isTruncateRecovery) ?
rBlock.getNewBlock().getBlockId() : block.getBlockId();
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ "), syncList=" + syncList);
@ -2679,7 +2687,7 @@ void syncBlock(RecoveringBlock rBlock,
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
-1, recoveryId);
switch(bestState) {
case FINALIZED:
@ -2691,9 +2699,6 @@ void syncBlock(RecoveringBlock rBlock,
r.rInfo.getNumBytes() == finalizedLength)
participatingList.add(r);
}
if(rBlock.getTruncateFlag())
newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
else
newBlock.setNumBytes(finalizedLength);
break;
case RBW:
@ -2706,21 +2711,21 @@ void syncBlock(RecoveringBlock rBlock,
participatingList.add(r);
}
}
if(rBlock.getTruncateFlag())
newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
else
newBlock.setNumBytes(minLength);
break;
case RUR:
case TEMPORARY:
assert false : "bad replica state: " + bestState;
}
if(isTruncateRecovery)
newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes());
List<DatanodeID> failedList = new ArrayList<DatanodeID>();
final List<BlockRecord> successList = new ArrayList<BlockRecord>();
for(BlockRecord r : participatingList) {
try {
r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes());
r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
newBlock.getNumBytes());
successList.add(r);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="

View File

@ -418,7 +418,7 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock
* @return the ID of storage that stores the block
*/
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newLength) throws IOException;
long recoveryId, long newBlockId, long newLength) throws IOException;
/**
* add new block pool ID

View File

@ -670,6 +670,12 @@ static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum);
}
static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
File dstFile, boolean calculateChecksum)
throws IOException {
if (calculateChecksum) {
computeChecksum(srcMeta, dstMeta, srcFile);
} else {
@ -2157,6 +2163,7 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
public synchronized String updateReplicaUnderRecovery(
final ExtendedBlock oldBlock,
final long recoveryId,
final long newBlockId,
final long newlength) throws IOException {
//get replica
final String bpid = oldBlock.getBlockPoolId();
@ -2189,13 +2196,26 @@ public synchronized String updateReplicaUnderRecovery(
//update replica
final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
.getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength);
.getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
newBlockId, newlength);
boolean copyTruncate = newBlockId != oldBlock.getBlockId();
if(!copyTruncate) {
assert finalized.getBlockId() == oldBlock.getBlockId()
&& finalized.getGenerationStamp() == recoveryId
&& finalized.getNumBytes() == newlength
: "Replica information mismatched: oldBlock=" + oldBlock
+ ", recoveryId=" + recoveryId + ", newlength=" + newlength
+ ", newBlockId=" + newBlockId + ", finalized=" + finalized;
} else {
assert finalized.getBlockId() == oldBlock.getBlockId()
&& finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
&& finalized.getNumBytes() == oldBlock.getNumBytes()
: "Finalized and old information mismatched: oldBlock=" + oldBlock
+ ", genStamp=" + oldBlock.getGenerationStamp()
+ ", len=" + oldBlock.getNumBytes()
+ ", finalized=" + finalized;
}
//check replica files after update
checkReplicaFiles(finalized);
@ -2208,6 +2228,7 @@ private FinalizedReplica updateReplicaUnderRecovery(
String bpid,
ReplicaUnderRecovery rur,
long recoveryId,
long newBlockId,
long newlength) throws IOException {
//check recovery id
if (rur.getRecoveryID() != recoveryId) {
@ -2215,26 +2236,62 @@ private FinalizedReplica updateReplicaUnderRecovery(
+ ", rur=" + rur);
}
boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId;
File blockFile;
File metaFile;
// bump rur's GS to be recovery id
if(!copyOnTruncate) {
bumpReplicaGS(rur, recoveryId);
blockFile = rur.getBlockFile();
metaFile = rur.getMetaFile();
} else {
File[] copiedReplicaFiles =
copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
blockFile = copiedReplicaFiles[1];
metaFile = copiedReplicaFiles[0];
}
//update length
final File replicafile = rur.getBlockFile();
if (rur.getNumBytes() < newlength) {
throw new IOException("rur.getNumBytes() < newlength = " + newlength
+ ", rur=" + rur);
}
if (rur.getNumBytes() > newlength) {
rur.unlinkBlock(1);
truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
if(!copyOnTruncate) {
// update RUR with the new length
rur.setNumBytes(newlength);
} else {
// Copying block to a new block with new blockId.
// Not truncating original block.
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
newBlockId, recoveryId, rur.getVolume(), blockFile.getParentFile(),
newlength);
newReplicaInfo.setNumBytes(newlength);
volumeMap.add(bpid, newReplicaInfo);
finalizeReplica(bpid, newReplicaInfo);
}
}
// finalize the block
return finalizeReplica(bpid, rur);
}
private File[] copyReplicaWithNewBlockIdAndGS(
ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS)
throws IOException {
String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId;
FsVolumeImpl v = volumes.getNextVolume(
replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes());
final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir();
final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
final File dstBlockFile = new File(destDir, blockFileName);
final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
dstMetaFile, dstBlockFile, true);
}
@Override // FsDatasetSpi
public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {

View File

@ -424,7 +424,7 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
inSnapshot, feInfo);
if (loc == null) {
loc = new LocatedBlocks();

View File

@ -1093,18 +1093,31 @@ public INodeMap getINodeMap() {
* Unlike FSNamesystem.truncate, this will not schedule block recovery.
*/
void unprotectedTruncate(String src, String clientName, String clientMachine,
long newLength, long mtime)
long newLength, long mtime, Block truncateBlock)
throws UnresolvedLinkException, QuotaExceededException,
SnapshotAccessControlException, IOException {
INodesInPath iip = getINodesInPath(src, true);
INodeFile file = iip.getLastINode().asFile();
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
boolean onBlockBoundary =
unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
if(! onBlockBoundary) {
getFSNamesystem().prepareFileForWrite(src,
iip, clientName, clientMachine, false, false);
BlockInfo oldBlock = file.getLastBlock();
Block tBlk =
getFSNamesystem().prepareFileForTruncate(iip,
clientName, clientMachine, file.computeFileSize() - newLength,
truncateBlock);
assert Block.matchingIdAndGenStamp(tBlk, truncateBlock) &&
tBlk.getNumBytes() == truncateBlock.getNumBytes() :
"Should be the same block.";
if(oldBlock.getBlockId() != tBlk.getBlockId() &&
!file.isBlockInLatestSnapshot(oldBlock)) {
getBlockManager().removeBlockFromMap(oldBlock);
}
}
assert onBlockBoundary == (truncateBlock == null) :
"truncateBlock is null iff on block boundary: " + truncateBlock;
getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
}
@ -1123,7 +1136,8 @@ boolean truncate(INodesInPath iip, long newLength,
/**
* Truncate has the following properties:
* 1.) Any block deletions occur now.
* 2.) INode length is truncated now clients can only read up to new length.
* 2.) INode length is truncated now new clients can only read up to
* the truncated length.
* 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY.
* 4.) NN will trigger DN truncation recovery and waits for DNs to report.
* 5.) File is considered UNDER_RECOVERY until truncation recovery completes.
@ -1136,20 +1150,16 @@ boolean unprotectedTruncate(INodesInPath iip, long newLength,
long mtime) throws IOException {
assert hasWriteLock();
INodeFile file = iip.getLastINode().asFile();
int latestSnapshot = iip.getLatestSnapshotId();
file.recordModification(latestSnapshot, true);
long oldDiskspace = file.diskspaceConsumed();
long remainingLength =
file.collectBlocksBeyondMax(newLength, collectedBlocks);
file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks);
file.setModificationTime(mtime);
updateCount(iip, 0, file.diskspaceConsumed() - oldDiskspace, true);
// If on block boundary, then return
long lastBlockDelta = remainingLength - newLength;
if(lastBlockDelta == 0)
return true;
// Set new last block length
BlockInfo lastBlock = file.getLastBlock();
assert lastBlock.getNumBytes() - lastBlockDelta > 0 : "wrong block size";
lastBlock.setNumBytes(lastBlock.getNumBytes() - lastBlockDelta);
return false;
// return whether on a block boundary
return (remainingLength - newLength) == 0;
}
/**

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -902,13 +903,14 @@ void logDelete(String src, long timestamp, boolean toLogRpcIds) {
* Add truncate file record to edit log
*/
void logTruncate(String src, String clientName, String clientMachine,
long size, long timestamp) {
long size, long timestamp, Block truncateBlock) {
TruncateOp op = TruncateOp.getInstance(cache.get())
.setPath(src)
.setClientName(clientName)
.setClientMachine(clientMachine)
.setNewLength(size)
.setTimestamp(timestamp);
.setTimestamp(timestamp)
.setTruncateBlock(truncateBlock);
logEdit(op);
}

View File

@ -857,7 +857,8 @@ fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
case OP_TRUNCATE: {
TruncateOp truncateOp = (TruncateOp) op;
fsDir.unprotectedTruncate(truncateOp.src, truncateOp.clientName,
truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp);
truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp,
truncateOp.truncateBlock);
break;
}
case OP_SET_STORAGE_POLICY: {

View File

@ -2611,6 +2611,7 @@ static class TruncateOp extends FSEditLogOp {
String clientMachine;
long newLength;
long timestamp;
Block truncateBlock;
private TruncateOp() {
super(OP_TRUNCATE);
@ -2654,6 +2655,11 @@ TruncateOp setTimestamp(long timestamp) {
return this;
}
TruncateOp setTruncateBlock(Block truncateBlock) {
this.truncateBlock = truncateBlock;
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
src = FSImageSerialization.readString(in);
@ -2661,6 +2667,10 @@ void readFields(DataInputStream in, int logVersion) throws IOException {
clientMachine = FSImageSerialization.readString(in);
newLength = FSImageSerialization.readLong(in);
timestamp = FSImageSerialization.readLong(in);
Block[] blocks =
FSImageSerialization.readCompactBlockArray(in, logVersion);
assert blocks.length <= 1 : "Truncate op should have 1 or 0 blocks";
truncateBlock = (blocks.length == 0) ? null : blocks[0];
}
@Override
@ -2670,6 +2680,12 @@ public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(clientMachine, out);
FSImageSerialization.writeLong(newLength, out);
FSImageSerialization.writeLong(timestamp, out);
int size = truncateBlock != null ? 1 : 0;
Block[] blocks = new Block[size];
if (truncateBlock != null) {
blocks[0] = truncateBlock;
}
FSImageSerialization.writeCompactBlockArray(blocks, out);
}
@Override
@ -2681,6 +2697,8 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
Long.toString(newLength));
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.toString(timestamp));
if(truncateBlock != null)
FSEditLogOp.blockToXml(contentHandler, truncateBlock);
}
@Override
@ -2690,6 +2708,8 @@ void fromXml(Stanza st) throws InvalidXmlException {
this.clientMachine = st.getValue("CLIENTMACHINE");
this.newLength = Long.parseLong(st.getValue("NEWLENGTH"));
this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
if (st.hasChildren("BLOCK"))
this.truncateBlock = FSEditLogOp.blockFromXml(st);
}
@Override
@ -2705,6 +2725,8 @@ public String toString() {
builder.append(newLength);
builder.append(", timestamp=");
builder.append(timestamp);
builder.append(", truncateBlock=");
builder.append(truncateBlock);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");

View File

@ -1837,8 +1837,8 @@ private GetBlockLocationsResult getBlockLocationsInt(
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
inode.getBlocks(), fileSize, isUc, offset, length, needBlockToken,
iip.isSnapshot(), feInfo);
inode.getBlocks(iip.getPathSnapshotId()), fileSize,
isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
@ -1912,7 +1912,7 @@ void setTimes(String src, long mtime, long atime) throws IOException {
* Truncation at block boundary is atomic, otherwise it requires
* block recovery to truncate the last block of the file.
*
* @return true if and client does not need to wait for block recovery,
* @return true if client does not need to wait for block recovery,
* false if client needs to wait for block recovery.
*/
boolean truncate(String src, long newLength,
@ -1974,44 +1974,119 @@ boolean truncateInternal(String src, long newLength,
dir.checkPathAccess(pc, iip, FsAction.WRITE);
}
INodeFile file = iip.getLastINode().asFile();
// Data will be lost after truncate occurs so it cannot support snapshots.
if(file.isInLatestSnapshot(iip.getLatestSnapshotId()))
throw new HadoopIllegalArgumentException(
"Cannot truncate file with snapshot.");
// Opening an existing file for write. May need lease recovery.
recoverLeaseInternal(iip, src, clientName, clientMachine, false);
// Refresh INode as the file could have been closed
iip = dir.getINodesInPath4Write(src, true);
file = INodeFile.valueOf(iip.getLastINode(), src);
// Truncate length check.
long oldLength = file.computeFileSize();
if(oldLength == newLength)
if(oldLength == newLength) {
return true;
if(oldLength < newLength)
}
if(oldLength < newLength) {
throw new HadoopIllegalArgumentException(
"Cannot truncate to a larger file size. Current size: " + oldLength +
", truncate size: " + newLength + ".");
}
// Perform INodeFile truncation.
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
boolean onBlockBoundary = dir.truncate(iip, newLength,
collectedBlocks, mtime);
Block truncateBlock = null;
if(! onBlockBoundary) {
// Open file for write, but don't log into edits
prepareFileForWrite(src, iip, clientName, clientMachine, false, false);
file = INodeFile.valueOf(dir.getINode4Write(src), src);
initializeBlockRecovery(file);
long lastBlockDelta = file.computeFileSize() - newLength;
assert lastBlockDelta > 0 : "delta is 0 only if on block bounday";
truncateBlock = prepareFileForTruncate(iip, clientName, clientMachine,
lastBlockDelta, null);
}
getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime);
getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime,
truncateBlock);
removeBlocks(collectedBlocks);
return onBlockBoundary;
}
void initializeBlockRecovery(INodeFile inodeFile) throws IOException {
BlockInfo lastBlock = inodeFile.getLastBlock();
long recoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(lastBlock));
((BlockInfoUnderConstruction)lastBlock).initializeBlockRecovery(
BlockUCState.BEING_TRUNCATED, recoveryId);
/**
* Convert current INode to UnderConstruction.
* Recreate lease.
* Create new block for the truncated copy.
* Schedule truncation of the replicas.
*
* @return the returned block will be written to editLog and passed back into
* this method upon loading.
*/
Block prepareFileForTruncate(INodesInPath iip,
String leaseHolder,
String clientMachine,
long lastBlockDelta,
Block newBlock)
throws IOException {
INodeFile file = iip.getLastINode().asFile();
String src = iip.getPath();
file.recordModification(iip.getLatestSnapshotId());
file.toUnderConstruction(leaseHolder, clientMachine);
assert file.isUnderConstruction() : "inode should be under construction.";
leaseManager.addLease(
file.getFileUnderConstructionFeature().getClientName(), src);
boolean shouldRecoverNow = (newBlock == null);
BlockInfo oldBlock = file.getLastBlock();
boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, oldBlock);
if(newBlock == null) {
newBlock = (shouldCopyOnTruncate) ? createNewBlock() :
new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(),
nextGenerationStamp(blockIdManager.isLegacyBlock(oldBlock)));
}
BlockInfoUnderConstruction truncatedBlockUC;
if(shouldCopyOnTruncate) {
// Add new truncateBlock into blocksMap and
// use oldBlock as a source for copy-on-truncate recovery
truncatedBlockUC = new BlockInfoUnderConstruction(newBlock,
file.getBlockReplication());
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
truncatedBlockUC.setTruncateBlock(oldBlock);
file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
getBlockManager().addBlockCollection(truncatedBlockUC, file);
NameNode.stateChangeLog.info("BLOCK* prepareFileForTruncate: "
+ "Scheduling copy-on-truncate to new size "
+ truncatedBlockUC.getNumBytes() + " new block " + newBlock
+ " old block " + truncatedBlockUC.getTruncateBlock());
} else {
// Use new generation stamp for in-place truncate recovery
blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
oldBlock = file.getLastBlock();
assert !oldBlock.isComplete() : "oldBlock should be under construction";
truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock;
truncatedBlockUC.setTruncateBlock(new Block(oldBlock));
truncatedBlockUC.getTruncateBlock().setNumBytes(
oldBlock.getNumBytes() - lastBlockDelta);
truncatedBlockUC.getTruncateBlock().setGenerationStamp(
newBlock.getGenerationStamp());
NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: "
+ "Scheduling in-place block truncate to new size "
+ truncatedBlockUC.getTruncateBlock().getNumBytes()
+ " block=" + truncatedBlockUC);
}
if(shouldRecoverNow)
truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
// update the quota: use the preferred block size for UC block
final long diff =
file.getPreferredBlockSize() - truncatedBlockUC.getNumBytes();
dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
return newBlock;
}
/**
* Defines if a replica needs to be copied on truncate or
* can be truncated in place.
*/
boolean shouldCopyOnTruncate(INodeFile file, BlockInfo blk) {
if(!isUpgradeFinalized()) {
return true;
}
return file.isBlockInLatestSnapshot(blk);
}
/**
@ -2598,7 +2673,8 @@ LocatedBlock prepareFileForWrite(String src, INodesInPath iip,
leaseManager.addLease(
file.getFileUnderConstructionFeature().getClientName(), src);
LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(file);
LocatedBlock ret =
blockManager.convertLastBlockToUnderConstruction(file, 0);
if (ret != null) {
// update the quota: use the preferred block size for UC block
final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
@ -2661,7 +2737,7 @@ boolean recoverLease(String src, String holder, String clientMachine)
return false;
}
private void recoverLeaseInternal(INodesInPath iip,
void recoverLeaseInternal(INodesInPath iip,
String src, String holder, String clientMachine, boolean force)
throws IOException {
assert hasWriteLock();
@ -2723,8 +2799,7 @@ private void recoverLeaseInternal(INodesInPath iip,
} else {
final BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null
&& (lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY ||
lastBlock.getBlockUCState() == BlockUCState.BEING_TRUNCATED)) {
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException("Recovery in progress, file ["
+ src + "], " + "lease owner [" + lease.getHolder() + "]");
} else {
@ -3942,8 +4017,18 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
throw new AlreadyBeingCreatedException(message);
case UNDER_CONSTRUCTION:
case UNDER_RECOVERY:
case BEING_TRUNCATED:
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock;
// determine if last block was intended to be truncated
Block recoveryBlock = uc.getTruncateBlock();
boolean truncateRecovery = recoveryBlock != null;
boolean copyOnTruncate = truncateRecovery &&
recoveryBlock.getBlockId() != uc.getBlockId();
assert !copyOnTruncate ||
recoveryBlock.getBlockId() < uc.getBlockId() &&
recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() &&
recoveryBlock.getNumBytes() > uc.getNumBytes() :
"wrong recoveryBlock";
// setup the last block locations from the blockManager if not known
if (uc.getNumExpectedLocations() == 0) {
uc.setExpectedLocations(blockManager.getStorages(lastBlock));
@ -3964,9 +4049,12 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
// start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
if (uc.getBlockUCState() != BlockUCState.BEING_TRUNCATED) {
uc.initializeBlockRecovery(blockRecoveryId);
if(copyOnTruncate) {
uc.setGenerationStamp(blockRecoveryId);
} else if(truncateRecovery) {
recoveryBlock.setGenerationStamp(blockRecoveryId);
}
uc.initializeBlockRecovery(blockRecoveryId);
leaseManager.renewLease(lease);
// Cannot close file right now, since the last block requires recovery.
// This may potentially cause infinite loop in lease recovery
@ -4076,11 +4164,11 @@ public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) {
return true;
}
void commitBlockSynchronization(ExtendedBlock lastblock,
void commitBlockSynchronization(ExtendedBlock oldBlock,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages) throws IOException {
LOG.info("commitBlockSynchronization(lastblock=" + lastblock
LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets)
@ -4099,17 +4187,17 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
checkNameNodeSafeMode(
"Cannot commitBlockSynchronization while in safe mode");
final BlockInfo storedBlock = getStoredBlock(
ExtendedBlock.getLocalBlock(lastblock));
ExtendedBlock.getLocalBlock(oldBlock));
if (storedBlock == null) {
if (deleteblock) {
// This may be a retry attempt so ignore the failure
// to locate the block.
if (LOG.isDebugEnabled()) {
LOG.debug("Block (=" + lastblock + ") not found");
LOG.debug("Block (=" + oldBlock + ") not found");
}
return;
} else {
throw new IOException("Block (=" + lastblock + ") not found");
throw new IOException("Block (=" + oldBlock + ") not found");
}
}
//
@ -4136,34 +4224,40 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
+ iFile.getFullPathName() + ", likely due to delayed block"
+ " removal");
}
if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) &&
iFile.getLastBlock().isComplete()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected block (=" + lastblock
LOG.debug("Unexpected block (=" + oldBlock
+ ") since the file (=" + iFile.getLocalName()
+ ") is not under construction");
}
return;
}
long recoveryId =
((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
BlockInfoUnderConstruction truncatedBlock =
(BlockInfoUnderConstruction) iFile.getLastBlock();
long recoveryId = truncatedBlock.getBlockRecoveryId();
boolean copyTruncate =
truncatedBlock.getBlockId() != storedBlock.getBlockId();
if(recoveryId != newgenerationstamp) {
throw new IOException("The recovery id " + newgenerationstamp
+ " does not match current recovery id "
+ recoveryId + " for block " + lastblock);
+ recoveryId + " for block " + oldBlock);
}
if (deleteblock) {
Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
Block blockToDel = ExtendedBlock.getLocalBlock(oldBlock);
boolean remove = iFile.removeLastBlock(blockToDel);
if (remove) {
blockManager.removeBlockFromMap(storedBlock);
blockManager.removeBlock(storedBlock);
}
}
else {
// update last block
if(!copyTruncate) {
storedBlock.setGenerationStamp(newgenerationstamp);
storedBlock.setNumBytes(newlength);
}
// find the DatanodeDescriptor objects
// There should be no locations in the blockManager till now because the
@ -4193,21 +4287,36 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
DatanodeStorageInfo storageInfo =
trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
if (storageInfo != null) {
if(copyTruncate) {
storageInfo.addBlock(truncatedBlock);
} else {
storageInfo.addBlock(storedBlock);
}
}
}
}
// add pipeline locations into the INodeUnderConstruction
DatanodeStorageInfo[] trimmedStorageInfos =
blockManager.getDatanodeManager().getDatanodeStorageInfos(
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
trimmedStorages.toArray(new String[trimmedStorages.size()]));
if(copyTruncate) {
iFile.setLastBlock(truncatedBlock, trimmedStorageInfos);
} else {
iFile.setLastBlock(storedBlock, trimmedStorageInfos);
}
}
if (closeFile) {
if(copyTruncate) {
src = closeFileCommitBlocks(iFile, truncatedBlock);
if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
blockManager.removeBlock(storedBlock);
}
} else {
src = closeFileCommitBlocks(iFile, storedBlock);
}
} else {
// If this commit does not want to close the file, persist blocks
src = iFile.getFullPathName();
@ -4218,13 +4327,13 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
}
getEditLog().logSync();
if (closeFile) {
LOG.info("commitBlockSynchronization(newblock=" + lastblock
LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock
+ ", file=" + src
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets) + ") successful");
} else {
LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
LOG.info("commitBlockSynchronization(" + oldBlock + ") successful");
}
}

View File

@ -228,7 +228,8 @@ public INodeAttributes getSnapshotINode(final int snapshotId) {
/** Is this inode in the latest snapshot? */
public final boolean isInLatestSnapshot(final int latestSnapshotId) {
if (latestSnapshotId == Snapshot.CURRENT_STATE_ID) {
if (latestSnapshotId == Snapshot.CURRENT_STATE_ID ||
latestSnapshotId == Snapshot.NO_SNAPSHOT_ID) {
return false;
}
// if parent is a reference node, parent must be a renamed node. We can
@ -817,9 +818,13 @@ public List<Block> getToDeleteList() {
* @param toDelete the to-be-deleted block
*/
public void addDeleteBlock(Block toDelete) {
if (toDelete != null) {
assert toDelete != null : "toDelete is null";
toDeleteList.add(toDelete);
}
public void removeDeleteBlock(Block block) {
assert block != null : "block is null";
toDeleteList.remove(block);
}
/**

View File

@ -24,7 +24,9 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
@ -304,6 +306,11 @@ public INodeFileAttributes getSnapshotINode(final int snapshotId) {
@Override
public void recordModification(final int latestSnapshotId)
throws QuotaExceededException {
recordModification(latestSnapshotId, false);
}
public void recordModification(final int latestSnapshotId, boolean withBlocks)
throws QuotaExceededException {
if (isInLatestSnapshot(latestSnapshotId)
&& !shouldRecordInSrcSnapshot(latestSnapshotId)) {
// the file is in snapshot, create a snapshot feature if it does not have
@ -312,7 +319,7 @@ public void recordModification(final int latestSnapshotId)
sf = addSnapshotFeature(null);
}
// record self in the diff list if necessary
sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null);
sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null, withBlocks);
}
}
@ -415,6 +422,20 @@ public BlockInfo[] getBlocks() {
return this.blocks;
}
/** @return blocks of the file corresponding to the snapshot. */
public BlockInfo[] getBlocks(int snapshot) {
if(snapshot == CURRENT_STATE_ID || getDiffs() == null)
return getBlocks();
FileDiff diff = getDiffs().getDiffById(snapshot);
BlockInfo[] snapshotBlocks = diff == null ? getBlocks() : diff.getBlocks();
if(snapshotBlocks != null)
return snapshotBlocks;
// Blocks are not in the current snapshot
// Find next snapshot with blocks present or return current file blocks
snapshotBlocks = getDiffs().findLaterSnapshotBlocks(diff.getSnapshotId());
return (snapshotBlocks == null) ? getBlocks() : snapshotBlocks;
}
void updateBlockCollection() {
if (blocks != null) {
for(BlockInfo b : blocks) {
@ -509,9 +530,9 @@ public void destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks,
}
clear();
removedINodes.add(this);
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
if (sf != null) {
sf.getDiffs().destroyAndCollectSnapshotBlocks(collectedBlocks);
sf.clearDiffs();
}
}
@ -554,39 +575,23 @@ public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
@Override
public final ContentSummaryComputationContext computeContentSummary(
final ContentSummaryComputationContext summary) {
computeContentSummary4Snapshot(summary.getCounts());
computeContentSummary4Current(summary.getCounts());
return summary;
}
private void computeContentSummary4Snapshot(final Content.Counts counts) {
// file length and diskspace only counted for the latest state of the file
// i.e. either the current state or the last snapshot
final Content.Counts counts = summary.getCounts();
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
if (sf != null) {
if (sf == null) {
counts.add(Content.LENGTH, computeFileSize());
counts.add(Content.FILE, 1);
} else {
final FileDiffList diffs = sf.getDiffs();
final int n = diffs.asList().size();
counts.add(Content.FILE, n);
if (n > 0 && sf.isCurrentFileDeleted()) {
counts.add(Content.LENGTH, diffs.getLast().getFileSize());
}
if (sf.isCurrentFileDeleted()) {
final long lastFileSize = diffs.getLast().getFileSize();
counts.add(Content.DISKSPACE, lastFileSize * getBlockReplication());
}
}
}
private void computeContentSummary4Current(final Content.Counts counts) {
FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature();
if (sf != null && sf.isCurrentFileDeleted()) {
return;
}
} else {
counts.add(Content.LENGTH, computeFileSize());
counts.add(Content.FILE, 1);
}
}
counts.add(Content.DISKSPACE, diskspaceConsumed());
return summary;
}
/** The same as computeFileSize(null). */
@ -651,11 +656,38 @@ public final long computeFileSize(boolean includesLastUcBlock,
return size;
}
/**
* Compute size consumed by all blocks of the current file,
* including blocks in its snapshots.
* Use preferred block size for the last block if it is under construction.
*/
public final long diskspaceConsumed() {
// use preferred block size for the last block if it is under construction
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
if(sf == null) {
return computeFileSize(true, true) * getBlockReplication();
}
// Collect all distinct blocks
long size = 0;
Set<Block> allBlocks = new HashSet<Block>(Arrays.asList(getBlocks()));
List<FileDiff> diffs = sf.getDiffs().asList();
for(FileDiff diff : diffs) {
BlockInfo[] diffBlocks = diff.getBlocks();
if (diffBlocks != null) {
allBlocks.addAll(Arrays.asList(diffBlocks));
}
}
for(Block block : allBlocks) {
size += block.getNumBytes();
}
// check if the last block is under construction
BlockInfo lastBlock = getLastBlock();
if(lastBlock != null && lastBlock instanceof BlockInfoUnderConstruction) {
size += getPreferredBlockSize() - lastBlock.getNumBytes();
}
return size * getBlockReplication();
}
public final long diskspaceConsumed(int lastSnapshotId) {
if (lastSnapshotId != CURRENT_STATE_ID) {
return computeFileSize(lastSnapshotId)
@ -706,7 +738,7 @@ public long collectBlocksBeyondMax(final long max,
final BlockInfo[] oldBlocks = getBlocks();
if (oldBlocks == null)
return 0;
//find the minimum n such that the size of the first n blocks > max
// find the minimum n such that the size of the first n blocks > max
int n = 0;
long size = 0;
for(; n < oldBlocks.length && max > size; n++) {
@ -717,15 +749,7 @@ public long collectBlocksBeyondMax(final long max,
// starting from block n, the data is beyond max.
// resize the array.
final BlockInfo[] newBlocks;
if (n == 0) {
newBlocks = BlockInfo.EMPTY_ARRAY;
} else {
newBlocks = new BlockInfo[n];
System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
}
// set new blocks
setBlocks(newBlocks);
truncateBlocksTo(n);
// collect the blocks beyond max
if (collectedBlocks != null) {
@ -735,4 +759,67 @@ public long collectBlocksBeyondMax(final long max,
}
return size;
}
void truncateBlocksTo(int n) {
final BlockInfo[] newBlocks;
if (n == 0) {
newBlocks = BlockInfo.EMPTY_ARRAY;
} else {
newBlocks = new BlockInfo[n];
System.arraycopy(getBlocks(), 0, newBlocks, 0, n);
}
// set new blocks
setBlocks(newBlocks);
}
public void collectBlocksBeyondSnapshot(BlockInfo[] snapshotBlocks,
BlocksMapUpdateInfo collectedBlocks) {
BlockInfo[] oldBlocks = getBlocks();
if(snapshotBlocks == null || oldBlocks == null)
return;
// Skip blocks in common between the file and the snapshot
int n = 0;
while(n < oldBlocks.length && n < snapshotBlocks.length &&
oldBlocks[n] == snapshotBlocks[n]) {
n++;
}
truncateBlocksTo(n);
// Collect the remaining blocks of the file
while(n < oldBlocks.length) {
collectedBlocks.addDeleteBlock(oldBlocks[n++]);
}
}
/** Exclude blocks collected for deletion that belong to a snapshot. */
void excludeSnapshotBlocks(int snapshotId,
BlocksMapUpdateInfo collectedBlocks) {
if(collectedBlocks == null || collectedBlocks.getToDeleteList().isEmpty())
return;
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
if(sf == null)
return;
BlockInfo[] snapshotBlocks =
getDiffs().findEarlierSnapshotBlocks(snapshotId);
if(snapshotBlocks == null)
return;
List<Block> toDelete = collectedBlocks.getToDeleteList();
for(Block blk : snapshotBlocks) {
if(toDelete.contains(blk))
collectedBlocks.removeDeleteBlock(blk);
}
}
/**
* @return true if the block is contained in a snapshot or false otherwise.
*/
boolean isBlockInLatestSnapshot(BlockInfo block) {
FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature();
if (sf == null || sf.getDiffs() == null)
return false;
BlockInfo[] snapshotBlocks =
getDiffs().findEarlierSnapshotBlocks(getDiffs().getLastSnapshotId());
if(snapshotBlocks == null)
return false;
return Arrays.asList(snapshotBlocks).contains(block);
}
}

View File

@ -69,7 +69,8 @@ public static enum Feature implements LayoutFeature {
CREATE_OVERWRITE(-58, "Use single editlog record for " +
"creating file with overwrite"),
XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
BLOCK_STORAGE_POLICY(-60, "Block Storage policy");
BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
TRUNCATE(-61, "Truncate");
private final FeatureInfo info;

View File

@ -163,9 +163,12 @@ public final int getLastSnapshotId() {
* id, otherwise <=.
* @return The id of the latest snapshot before the given snapshot.
*/
private final int getPrior(int anchorId, boolean exclusive) {
public final int getPrior(int anchorId, boolean exclusive) {
if (anchorId == Snapshot.CURRENT_STATE_ID) {
return getLastSnapshotId();
int last = getLastSnapshotId();
if(exclusive && last == anchorId)
return Snapshot.NO_SNAPSHOT_ID;
return last;
}
final int i = Collections.binarySearch(diffs, anchorId);
if (exclusive) { // must be the one before
@ -290,10 +293,11 @@ final D checkAndAddLatestSnapshotDiff(int latestSnapshotId, N currentINode)
}
/** Save the snapshot copy to the latest snapshot. */
public void saveSelf2Snapshot(int latestSnapshotId, N currentINode,
public D saveSelf2Snapshot(int latestSnapshotId, N currentINode,
A snapshotCopy) throws QuotaExceededException {
D diff = null;
if (latestSnapshotId != Snapshot.CURRENT_STATE_ID) {
D diff = checkAndAddLatestSnapshotDiff(latestSnapshotId, currentINode);
diff = checkAndAddLatestSnapshotDiff(latestSnapshotId, currentINode);
if (diff.snapshotINode == null) {
if (snapshotCopy == null) {
snapshotCopy = createSnapshotCopy(currentINode);
@ -301,6 +305,7 @@ public void saveSelf2Snapshot(int latestSnapshotId, N currentINode,
diff.saveSnapshotCopy(snapshotCopy);
}
}
return diff;
}
@Override

View File

@ -36,6 +36,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.AclEntryStatusFormat;
import org.apache.hadoop.hdfs.server.namenode.AclFeature;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@ -229,6 +233,20 @@ private void loadFileDiffList(InputStream in, INodeFile file, int size)
FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
pbf.getFileSize());
List<BlockProto> bpl = pbf.getBlocksList();
BlockInfo[] blocks = new BlockInfo[bpl.size()];
for(int j = 0, e = bpl.size(); j < e; ++j) {
Block blk = PBHelper.convert(bpl.get(j));
BlockInfo storedBlock = fsn.getBlockManager().getStoredBlock(blk);
if(storedBlock == null) {
storedBlock = fsn.getBlockManager().addBlockCollection(
new BlockInfo(blk, copy.getFileReplication()), file);
}
blocks[j] = storedBlock;
}
if(blocks.length > 0) {
diff.setBlocks(blocks);
}
diffs.addFirst(diff);
}
file.addSnapshotFeature(diffs);
@ -472,6 +490,11 @@ private void serializeFileDiffList(INodeFile file, OutputStream out)
SnapshotDiffSection.FileDiff.Builder fb = SnapshotDiffSection.FileDiff
.newBuilder().setSnapshotId(diff.getSnapshotId())
.setFileSize(diff.getFileSize());
if(diff.getBlocks() != null) {
for(Block block : diff.getBlocks()) {
fb.addBlocks(PBHelper.convert(block));
}
}
INodeFileAttributes copy = diff.snapshotINode;
if (copy != null) {
fb.setName(ByteString.copyFrom(copy.getLocalNameBytes()))

View File

@ -19,8 +19,10 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@ -37,10 +39,13 @@ public class FileDiff extends
/** The file size at snapshot creation time. */
private final long fileSize;
/** A copy of the INodeFile block list. Used in truncate. */
private BlockInfo[] blocks;
FileDiff(int snapshotId, INodeFile file) {
super(snapshotId, null, null);
fileSize = file.computeFileSize();
blocks = null;
}
/** Constructor used by FSImage loading */
@ -48,6 +53,7 @@ public class FileDiff extends
FileDiff posteriorDiff, long fileSize) {
super(snapshotId, snapshotINode, posteriorDiff);
this.fileSize = fileSize;
blocks = null;
}
/** @return the file size in the snapshot. */
@ -55,13 +61,32 @@ public long getFileSize() {
return fileSize;
}
/**
* Copy block references into the snapshot
* up to the current {@link #fileSize}.
* Should be done only once.
*/
public void setBlocks(BlockInfo[] blocks) {
if(this.blocks != null)
return;
int numBlocks = 0;
for(long s = 0; numBlocks < blocks.length && s < fileSize; numBlocks++)
s += blocks[numBlocks].getNumBytes();
this.blocks = Arrays.copyOf(blocks, numBlocks);
}
public BlockInfo[] getBlocks() {
return blocks;
}
@Override
Quota.Counts combinePosteriorAndCollectBlocks(INodeFile currentINode,
FileDiff posterior, BlocksMapUpdateInfo collectedBlocks,
final List<INode> removedINodes) {
return currentINode.getFileWithSnapshotFeature()
.updateQuotaAndCollectBlocks(currentINode, posterior, collectedBlocks,
removedINodes);
FileWithSnapshotFeature sf = currentINode.getFileWithSnapshotFeature();
assert sf != null : "FileWithSnapshotFeature is null";
return sf.updateQuotaAndCollectBlocks(
currentINode, posterior, collectedBlocks, removedINodes);
}
@Override
@ -91,4 +116,13 @@ Quota.Counts destroyDiffAndCollectBlocks(INodeFile currentINode,
.updateQuotaAndCollectBlocks(currentINode, this, collectedBlocks,
removedINodes);
}
public void destroyAndCollectSnapshotBlocks(
BlocksMapUpdateInfo collectedBlocks) {
if(blocks == null || collectedBlocks == null)
return;
for(BlockInfo blk : blocks)
collectedBlocks.addDeleteBlock(blk);
blocks = null;
}
}

View File

@ -17,6 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
@ -33,4 +40,95 @@ FileDiff createDiff(int snapshotId, INodeFile file) {
INodeFileAttributes createSnapshotCopy(INodeFile currentINode) {
return new INodeFileAttributes.SnapshotCopy(currentINode);
}
public void destroyAndCollectSnapshotBlocks(
BlocksMapUpdateInfo collectedBlocks) {
for(FileDiff d : asList())
d.destroyAndCollectSnapshotBlocks(collectedBlocks);
}
public void saveSelf2Snapshot(int latestSnapshotId, INodeFile iNodeFile,
INodeFileAttributes snapshotCopy, boolean withBlocks)
throws QuotaExceededException {
final FileDiff diff =
super.saveSelf2Snapshot(latestSnapshotId, iNodeFile, snapshotCopy);
if(withBlocks) // Store blocks if this is the first update
diff.setBlocks(iNodeFile.getBlocks());
}
public BlockInfo[] findEarlierSnapshotBlocks(int snapshotId) {
assert snapshotId != Snapshot.NO_SNAPSHOT_ID : "Wrong snapshot id";
if(snapshotId == Snapshot.CURRENT_STATE_ID) {
return null;
}
List<FileDiff> diffs = this.asList();
int i = Collections.binarySearch(diffs, snapshotId);
BlockInfo[] blocks = null;
for(i = i >= 0 ? i : -i; i < diffs.size(); i--) {
blocks = diffs.get(i).getBlocks();
if(blocks != null) {
break;
}
}
return blocks;
}
public BlockInfo[] findLaterSnapshotBlocks(int snapshotId) {
assert snapshotId != Snapshot.NO_SNAPSHOT_ID : "Wrong snapshot id";
if(snapshotId == Snapshot.CURRENT_STATE_ID) {
return null;
}
List<FileDiff> diffs = this.asList();
int i = Collections.binarySearch(diffs, snapshotId);
BlockInfo[] blocks = null;
for(i = i >= 0 ? i+1 : -i-1; i < diffs.size(); i++) {
blocks = diffs.get(i).getBlocks();
if(blocks != null) {
break;
}
}
return blocks;
}
/**
* Copy blocks from the removed snapshot into the previous snapshot
* up to the file length of the latter.
* Collect unused blocks of the removed snapshot.
*/
void combineAndCollectSnapshotBlocks(INodeFile file,
FileDiff removed,
BlocksMapUpdateInfo collectedBlocks,
List<INode> removedINodes) {
BlockInfo[] removedBlocks = removed.getBlocks();
if(removedBlocks == null) {
FileWithSnapshotFeature sf = file.getFileWithSnapshotFeature();
assert sf != null : "FileWithSnapshotFeature is null";
if(sf.isCurrentFileDeleted())
sf.collectBlocksAndClear(file, collectedBlocks, removedINodes);
return;
}
int p = getPrior(removed.getSnapshotId(), true);
FileDiff earlierDiff = p == Snapshot.NO_SNAPSHOT_ID ? null : getDiffById(p);
// Copy blocks to the previous snapshot if not set already
if(earlierDiff != null)
earlierDiff.setBlocks(removedBlocks);
BlockInfo[] earlierBlocks =
(earlierDiff == null ? new BlockInfo[]{} : earlierDiff.getBlocks());
// Find later snapshot (or file itself) with blocks
BlockInfo[] laterBlocks = findLaterSnapshotBlocks(removed.getSnapshotId());
laterBlocks = (laterBlocks==null) ? file.getBlocks() : laterBlocks;
// Skip blocks, which belong to either the earlier or the later lists
int i = 0;
for(; i < removedBlocks.length; i++) {
if(i < earlierBlocks.length && removedBlocks[i] == earlierBlocks[i])
continue;
if(i < laterBlocks.length && removedBlocks[i] == laterBlocks[i])
continue;
break;
}
// Collect the remaining blocks of the file
while(i < removedBlocks.length) {
collectedBlocks.addDeleteBlock(removedBlocks[i++]);
}
}
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.AclFeature;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@ -155,7 +156,8 @@ public Quota.Counts updateQuotaAndCollectBlocks(INodeFile file,
}
}
collectBlocksAndClear(file, collectedBlocks, removedINodes);
getDiffs().combineAndCollectSnapshotBlocks(
file, removed, collectedBlocks, removedINodes);
long dsDelta = oldDiskspace - file.diskspaceConsumed();
return Quota.Counts.newInstance(0, dsDelta);
@ -165,7 +167,7 @@ public Quota.Counts updateQuotaAndCollectBlocks(INodeFile file,
* If some blocks at the end of the block list no longer belongs to
* any inode, collect them and update the block list.
*/
private void collectBlocksAndClear(final INodeFile file,
public void collectBlocksAndClear(final INodeFile file,
final BlocksMapUpdateInfo info, final List<INode> removedINodes) {
// check if everything is deleted.
if (isCurrentFileDeleted() && getDiffs().asList().isEmpty()) {
@ -174,13 +176,19 @@ private void collectBlocksAndClear(final INodeFile file,
}
// find max file size.
final long max;
FileDiff diff = getDiffs().getLast();
if (isCurrentFileDeleted()) {
final FileDiff last = getDiffs().getLast();
max = last == null? 0: last.getFileSize();
max = diff == null? 0: diff.getFileSize();
} else {
max = file.computeFileSize();
}
// Collect blocks that should be deleted
FileDiff last = diffs.getLast();
BlockInfo[] snapshotBlocks = last == null ? null : last.getBlocks();
if(snapshotBlocks == null)
file.collectBlocksBeyondMax(max, info);
else
file.collectBlocksBeyondSnapshot(snapshotBlocks, info);
}
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -53,8 +54,8 @@ public class BlockRecoveryCommand extends DatanodeCommand {
@InterfaceAudience.Private
@InterfaceStability.Evolving
public static class RecoveringBlock extends LocatedBlock {
private boolean truncate;
private final long newGenerationStamp;
private final Block recoveryBlock;
/**
* Create RecoveringBlock.
@ -62,15 +63,17 @@ public static class RecoveringBlock extends LocatedBlock {
public RecoveringBlock(ExtendedBlock b, DatanodeInfo[] locs, long newGS) {
super(b, locs, -1, false); // startOffset is unknown
this.newGenerationStamp = newGS;
this.recoveryBlock = null;
}
/**
* RecoveryingBlock with truncate option.
* Create RecoveringBlock with copy-on-truncate option.
*/
public RecoveringBlock(ExtendedBlock b, DatanodeInfo[] locs, long newGS,
boolean truncate) {
this(b, locs, newGS);
this.truncate = truncate;
public RecoveringBlock(ExtendedBlock b, DatanodeInfo[] locs,
Block recoveryBlock) {
super(b, locs, -1, false); // startOffset is unknown
this.newGenerationStamp = recoveryBlock.getGenerationStamp();
this.recoveryBlock = recoveryBlock;
}
/**
@ -82,10 +85,10 @@ public long getNewGenerationStamp() {
}
/**
* Return whether to truncate the block to the ExtendedBlock's length.
* Return the new block.
*/
public boolean getTruncateFlag() {
return truncate;
public Block getNewBlock() {
return recoveryBlock;
}
}

View File

@ -67,5 +67,6 @@ ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
* Update replica with the new generation stamp and length.
*/
String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId,
long newLength) throws IOException;
long newBlockId, long newLength)
throws IOException;
}

View File

@ -59,6 +59,8 @@ message UpdateReplicaUnderRecoveryRequestProto {
required ExtendedBlockProto block = 1; // Block identifier
required uint64 recoveryId = 2; // New genstamp of the replica
required uint64 newLength = 3; // New length of the replica
// New blockId for copy (truncate), default is 0.
optional uint64 newBlockId = 4 [default = 0];
}
/**

View File

@ -270,6 +270,7 @@ message SnapshotDiffSection {
optional uint64 fileSize = 2;
optional bytes name = 3;
optional INodeSection.INodeFile snapshotCopy = 4;
repeated BlockProto blocks = 5;
}
message DiffEntry {

View File

@ -556,7 +556,7 @@ enum ReplicaStateProto {
message RecoveringBlockProto {
required uint64 newGenStamp = 1; // New genstamp post recovery
required LocatedBlockProto block = 2; // Block to be recovered
optional bool truncateFlag = 3; // Block needs to be truncated
optional BlockProto truncateBlock = 3; // New block for recovery (truncate)
}
/**

View File

@ -1239,7 +1239,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
.thenReturn(ucBlock);
bm.convertLastBlockToUnderConstruction(mbc);
bm.convertLastBlockToUnderConstruction(mbc, 0L);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.

View File

@ -1106,6 +1106,7 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
@Override // FsDatasetSpi
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newBlockId,
long newlength) {
// Caller does not care about the exact Storage UUID returned.
return datanodeUuid;

View File

@ -56,7 +56,6 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -220,9 +219,9 @@ private void testSyncReplicas(ReplicaRecoveryInfo replica1,
syncList.add(record2);
when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong())).thenReturn("storage1");
anyLong(), anyLong())).thenReturn("storage1");
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong())).thenReturn("storage2");
anyLong(), anyLong())).thenReturn("storage2");
dn.syncBlock(rBlock, syncList);
}
@ -245,8 +244,10 @@ public void testFinalizedReplicas () throws IOException {
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
// two finalized replicas have different length
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
@ -284,8 +285,10 @@ public void testFinalizedRbwReplicas() throws IOException {
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
// rbw replica has a different length from the finalized one
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
@ -297,9 +300,10 @@ public void testFinalizedRbwReplicas() throws IOException {
dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, REPLICA_LEN1);
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
/**
@ -323,9 +327,10 @@ public void testFinalizedRwrReplicas() throws IOException {
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, REPLICA_LEN1);
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
// rbw replica has a different length from the finalized one
replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
@ -337,9 +342,10 @@ public void testFinalizedRwrReplicas() throws IOException {
dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, REPLICA_LEN1);
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
/**
@ -362,8 +368,8 @@ public void testRBWReplicas() throws IOException {
long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
}
/**
@ -385,9 +391,9 @@ public void testRBW_RWRReplicas() throws IOException {
InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
verify(dn2, never()).updateReplicaUnderRecovery(
block, RECOVERY_ID, REPLICA_LEN1);
block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
}
/**
@ -411,8 +417,8 @@ public void testRWRReplicas() throws IOException {
long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
}
private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
@ -513,7 +519,7 @@ public void testFailedReplicaUpdate() throws IOException {
}
DataNode spyDN = spy(dn);
doThrow(new IOException()).when(spyDN).updateReplicaUnderRecovery(
block, RECOVERY_ID, block.getNumBytes());
block, RECOVERY_ID, BLOCK_ID, block.getNumBytes());
try {
spyDN.syncBlock(rBlock, initBlockRecords(spyDN));
fail("Sync should fail");
@ -634,7 +640,8 @@ public void run() {
recoveryInitResult.get());
dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
.getGenerationStamp() + 1, block.getBlockSize());
.getGenerationStamp() + 1, block.getBlock().getBlockId(),
block.getBlockSize());
} finally {
if (null != cluster) {
cluster.shutdown();

View File

@ -198,7 +198,8 @@ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
//verify updateBlock
ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(),
b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1);
idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes());
idp.updateReplicaUnderRecovery(b, recoveryId, b.getBlockId(),
newblock.getNumBytes());
checkMetaInfo(newblock, datanode);
// Verify correct null response trying to init recovery for a missing block
@ -368,7 +369,8 @@ public void testUpdateReplicaUnderRecovery() throws IOException {
.getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp());
try {
//update should fail
fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength);
fsdataset.updateReplicaUnderRecovery(tmp, recoveryid,
tmp.getBlockId(), newlength);
Assert.fail();
} catch(IOException ioe) {
System.out.println("GOOD: getting " + ioe);
@ -377,7 +379,8 @@ public void testUpdateReplicaUnderRecovery() throws IOException {
//update
final String storageID = fsdataset.updateReplicaUnderRecovery(
new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength);
new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid,
rri.getBlockId(), newlength);
assertTrue(storageID != null);
} finally {

View File

@ -71,6 +71,7 @@ private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
doReturn(true).when(file).isUnderConstruction();
doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
doReturn(blockInfo).when(file).getLastBlock();
doReturn("").when(namesystemSpy).closeFileCommitBlocks(
any(INodeFile.class), any(BlockInfo.class));
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
@ -105,6 +106,7 @@ public void testCommitBlockSynchronization() throws IOException {
completedBlockInfo.setGenerationStamp(genStamp);
doReturn(completedBlockInfo).when(namesystemSpy)
.getStoredBlock(any(Block.class));
doReturn(completedBlockInfo).when(file).getLastBlock();
// Repeat the call to make sure it does not throw
namesystemSpy.commitBlockSynchronization(
@ -176,6 +178,7 @@ public void testCommitBlockSynchronizationWithClose() throws IOException {
completedBlockInfo.setGenerationStamp(genStamp);
doReturn(completedBlockInfo).when(namesystemSpy)
.getStoredBlock(any(Block.class));
doReturn(completedBlockInfo).when(file).getLastBlock();
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true, false, newTargets, null);

View File

@ -18,14 +18,22 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -39,14 +47,14 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -57,6 +65,7 @@ public class TestFileTruncate {
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL);
}
static final Log LOG = LogFactory.getLog(TestFileTruncate.class);
static final int BLOCK_SIZE = 4;
static final short REPLICATION = 3;
static final int DATANODE_NUM = 3;
@ -129,6 +138,287 @@ public void testBasicTruncate() throws IOException {
fs.delete(parent, true);
}
@Test
public void testSnapshotWithAppendTruncate() throws IOException {
testSnapshotWithAppendTruncate(0, 1, 2);
testSnapshotWithAppendTruncate(0, 2, 1);
testSnapshotWithAppendTruncate(1, 0, 2);
testSnapshotWithAppendTruncate(1, 2, 0);
testSnapshotWithAppendTruncate(2, 0, 1);
testSnapshotWithAppendTruncate(2, 1, 0);
}
/**
* Create three snapshots with appended and truncated file.
* Delete snapshots in the specified order and verify that
* remaining snapshots are still readable.
*/
void testSnapshotWithAppendTruncate(int ... deleteOrder) throws IOException {
FSDirectory fsDir = cluster.getNamesystem().getFSDirectory();
Path parent = new Path("/test");
fs.mkdirs(parent);
fs.setQuota(parent, 100, 1000);
fs.allowSnapshot(parent);
String truncateFile = "testSnapshotWithAppendTruncate";
final Path src = new Path(parent, truncateFile);
int[] length = new int[4];
length[0] = 2 * BLOCK_SIZE + BLOCK_SIZE / 2;
DFSTestUtil.createFile(fs, src, 64, length[0], BLOCK_SIZE, REPLICATION, 0L);
Block firstBlk = getLocatedBlocks(src).get(0).getBlock().getLocalBlock();
Path[] snapshotFiles = new Path[4];
// Diskspace consumed should be 10 bytes * 3. [blk 1,2,3]
ContentSummary contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(30L));
// Add file to snapshot and append
String[] ss = new String[] {"ss0", "ss1", "ss2", "ss3"};
Path snapshotDir = fs.createSnapshot(parent, ss[0]);
snapshotFiles[0] = new Path(snapshotDir, truncateFile);
length[1] = length[2] = length[0] + BLOCK_SIZE + 1;
DFSTestUtil.appendFile(fs, src, BLOCK_SIZE + 1);
Block lastBlk = getLocatedBlocks(src).getLastLocatedBlock()
.getBlock().getLocalBlock();
// Diskspace consumed should be 15 bytes * 3. [blk 1,2,3,4]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(45L));
// Create another snapshot without changes
snapshotDir = fs.createSnapshot(parent, ss[1]);
snapshotFiles[1] = new Path(snapshotDir, truncateFile);
// Create another snapshot and append
snapshotDir = fs.createSnapshot(parent, ss[2]);
snapshotFiles[2] = new Path(snapshotDir, truncateFile);
DFSTestUtil.appendFile(fs, src, BLOCK_SIZE -1 + BLOCK_SIZE / 2);
Block appendedBlk = getLocatedBlocks(src).getLastLocatedBlock()
.getBlock().getLocalBlock();
// Diskspace consumed should be 20 bytes * 3. [blk 1,2,3,4,5]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(60L));
// Truncate to block boundary
int newLength = length[0] + BLOCK_SIZE / 2;
boolean isReady = fs.truncate(src, newLength);
assertTrue("Recovery is not expected.", isReady);
assertFileLength(snapshotFiles[2], length[2]);
assertFileLength(snapshotFiles[1], length[1]);
assertFileLength(snapshotFiles[0], length[0]);
assertBlockNotPresent(appendedBlk);
// Diskspace consumed should be 16 bytes * 3. [blk 1,2,3 SS:4]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(48L));
// Truncate full block again
newLength = length[0] - BLOCK_SIZE / 2;
isReady = fs.truncate(src, newLength);
assertTrue("Recovery is not expected.", isReady);
assertFileLength(snapshotFiles[2], length[2]);
assertFileLength(snapshotFiles[1], length[1]);
assertFileLength(snapshotFiles[0], length[0]);
// Diskspace consumed should be 16 bytes * 3. [blk 1,2 SS:3,4]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(48L));
// Truncate half of the last block
newLength -= BLOCK_SIZE / 2;
isReady = fs.truncate(src, newLength);
assertFalse("Recovery is expected.", isReady);
checkBlockRecovery(src);
assertFileLength(snapshotFiles[2], length[2]);
assertFileLength(snapshotFiles[1], length[1]);
assertFileLength(snapshotFiles[0], length[0]);
Block replacedBlk = getLocatedBlocks(src).getLastLocatedBlock()
.getBlock().getLocalBlock();
// Diskspace consumed should be 16 bytes * 3. [blk 1,6 SS:2,3,4]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(54L));
snapshotDir = fs.createSnapshot(parent, ss[3]);
snapshotFiles[3] = new Path(snapshotDir, truncateFile);
length[3] = newLength;
// Delete file. Should still be able to read snapshots
int numINodes = fsDir.getInodeMapSize();
isReady = fs.delete(src, false);
assertTrue("Delete failed.", isReady);
assertFileLength(snapshotFiles[3], length[3]);
assertFileLength(snapshotFiles[2], length[2]);
assertFileLength(snapshotFiles[1], length[1]);
assertFileLength(snapshotFiles[0], length[0]);
assertEquals("Number of INodes should not change",
numINodes, fsDir.getInodeMapSize());
fs.deleteSnapshot(parent, ss[3]);
assertBlockExists(firstBlk);
assertBlockExists(lastBlk);
assertBlockNotPresent(replacedBlk);
// Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(48L));
// delete snapshots in the specified order
fs.deleteSnapshot(parent, ss[deleteOrder[0]]);
assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]);
assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
assertBlockExists(firstBlk);
assertBlockExists(lastBlk);
assertEquals("Number of INodes should not change",
numINodes, fsDir.getInodeMapSize());
// Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(48L));
fs.deleteSnapshot(parent, ss[deleteOrder[1]]);
assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
assertBlockExists(firstBlk);
contentSummary = fs.getContentSummary(parent);
if(fs.exists(snapshotFiles[0])) {
// Diskspace consumed should be 0 bytes * 3. [SS:1,2,3]
assertBlockNotPresent(lastBlk);
assertThat(contentSummary.getSpaceConsumed(), is(36L));
} else {
// Diskspace consumed should be 48 bytes * 3. [SS:1,2,3,4]
assertThat(contentSummary.getSpaceConsumed(), is(48L));
}
assertEquals("Number of INodes should not change",
numINodes, fsDir .getInodeMapSize());
fs.deleteSnapshot(parent, ss[deleteOrder[2]]);
assertBlockNotPresent(firstBlk);
assertBlockNotPresent(lastBlk);
// Diskspace consumed should be 0 bytes * 3. []
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(0L));
assertNotEquals("Number of INodes should change",
numINodes, fsDir.getInodeMapSize());
}
/**
* Create three snapshots with file truncated 3 times.
* Delete snapshots in the specified order and verify that
* remaining snapshots are still readable.
*/
@Test
public void testSnapshotWithTruncates() throws IOException {
testSnapshotWithTruncates(0, 1, 2);
testSnapshotWithTruncates(0, 2, 1);
testSnapshotWithTruncates(1, 0, 2);
testSnapshotWithTruncates(1, 2, 0);
testSnapshotWithTruncates(2, 0, 1);
testSnapshotWithTruncates(2, 1, 0);
}
void testSnapshotWithTruncates(int ... deleteOrder) throws IOException {
Path parent = new Path("/test");
fs.mkdirs(parent);
fs.setQuota(parent, 100, 1000);
fs.allowSnapshot(parent);
String truncateFile = "testSnapshotWithTruncates";
final Path src = new Path(parent, truncateFile);
int[] length = new int[3];
length[0] = 3 * BLOCK_SIZE;
DFSTestUtil.createFile(fs, src, 64, length[0], BLOCK_SIZE, REPLICATION, 0L);
Block firstBlk = getLocatedBlocks(src).get(0).getBlock().getLocalBlock();
Block lastBlk = getLocatedBlocks(src).getLastLocatedBlock()
.getBlock().getLocalBlock();
Path[] snapshotFiles = new Path[3];
// Diskspace consumed should be 12 bytes * 3. [blk 1,2,3]
ContentSummary contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(36L));
// Add file to snapshot and append
String[] ss = new String[] {"ss0", "ss1", "ss2"};
Path snapshotDir = fs.createSnapshot(parent, ss[0]);
snapshotFiles[0] = new Path(snapshotDir, truncateFile);
length[1] = 2 * BLOCK_SIZE;
boolean isReady = fs.truncate(src, 2 * BLOCK_SIZE);
assertTrue("Recovery is not expected.", isReady);
// Diskspace consumed should be 12 bytes * 3. [blk 1,2 SS:3]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(36L));
snapshotDir = fs.createSnapshot(parent, ss[1]);
snapshotFiles[1] = new Path(snapshotDir, truncateFile);
// Create another snapshot with truncate
length[2] = BLOCK_SIZE + BLOCK_SIZE / 2;
isReady = fs.truncate(src, BLOCK_SIZE + BLOCK_SIZE / 2);
assertFalse("Recovery is expected.", isReady);
checkBlockRecovery(src);
snapshotDir = fs.createSnapshot(parent, ss[2]);
snapshotFiles[2] = new Path(snapshotDir, truncateFile);
assertFileLength(snapshotFiles[0], length[0]);
assertBlockExists(lastBlk);
// Diskspace consumed should be 14 bytes * 3. [blk 1,4 SS:2,3]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(42L));
fs.deleteSnapshot(parent, ss[deleteOrder[0]]);
assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]);
assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
assertFileLength(src, length[2]);
assertBlockExists(firstBlk);
contentSummary = fs.getContentSummary(parent);
if(fs.exists(snapshotFiles[0])) {
// Diskspace consumed should be 14 bytes * 3. [blk 1,4 SS:2,3]
assertThat(contentSummary.getSpaceConsumed(), is(42L));
assertBlockExists(lastBlk);
} else {
// Diskspace consumed should be 10 bytes * 3. [blk 1,4 SS:2]
assertThat(contentSummary.getSpaceConsumed(), is(30L));
assertBlockNotPresent(lastBlk);
}
fs.deleteSnapshot(parent, ss[deleteOrder[1]]);
assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]);
assertFileLength(src, length[2]);
assertBlockExists(firstBlk);
contentSummary = fs.getContentSummary(parent);
if(fs.exists(snapshotFiles[0])) {
// Diskspace consumed should be 14 bytes * 3. [blk 1,4 SS:2,3]
assertThat(contentSummary.getSpaceConsumed(), is(42L));
assertBlockExists(lastBlk);
} else if(fs.exists(snapshotFiles[1])) {
// Diskspace consumed should be 10 bytes * 3. [blk 1,4 SS:2]
assertThat(contentSummary.getSpaceConsumed(), is(30L));
assertBlockNotPresent(lastBlk);
} else {
// Diskspace consumed should be 6 bytes * 3. [blk 1,4 SS:]
assertThat(contentSummary.getSpaceConsumed(), is(18L));
assertBlockNotPresent(lastBlk);
}
fs.deleteSnapshot(parent, ss[deleteOrder[2]]);
assertFileLength(src, length[2]);
assertBlockExists(firstBlk);
// Diskspace consumed should be 6 bytes * 3. [blk 1,4 SS:]
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(18L));
assertThat(contentSummary.getLength(), is(6L));
fs.delete(src, false);
assertBlockNotPresent(firstBlk);
// Diskspace consumed should be 0 bytes * 3. []
contentSummary = fs.getContentSummary(parent);
assertThat(contentSummary.getSpaceConsumed(), is(0L));
}
/**
* Failure / recovery test for truncate.
* In this failure the DNs fail to recover the blocks and the NN triggers
@ -159,8 +449,6 @@ public void testTruncateFailure() throws IOException {
boolean isReady = fs.truncate(p, newLength);
assertThat("truncate should have triggered block recovery.",
isReady, is(false));
FileStatus fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLength));
boolean recoveryTriggered = false;
for(int i = 0; i < RECOVERY_ATTEMPTS; i++) {
@ -168,8 +456,6 @@ public void testTruncateFailure() throws IOException {
NameNodeAdapter.getLeaseHolderForPath(cluster.getNameNode(),
p.toUri().getPath());
if(leaseHolder.equals(HdfsServerConstants.NAMENODE_LEASE_HOLDER)) {
cluster.startDataNodes(conf, DATANODE_NUM, true,
HdfsServerConstants.StartupOption.REGULAR, null);
recoveryTriggered = true;
break;
}
@ -177,6 +463,9 @@ public void testTruncateFailure() throws IOException {
}
assertThat("lease recovery should have occurred in ~" +
SLEEP * RECOVERY_ATTEMPTS + " ms.", recoveryTriggered, is(true));
cluster.startDataNodes(conf, DATANODE_NUM, true,
StartupOption.REGULAR, null);
cluster.waitActive();
checkBlockRecovery(p);
@ -184,10 +473,10 @@ public void testTruncateFailure() throws IOException {
.setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
HdfsConstants.LEASE_HARDLIMIT_PERIOD);
fileStatus = fs.getFileStatus(p);
FileStatus fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLength));
AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString());
checkFullFile(p, newLength, contents);
fs.delete(p, false);
}
@ -198,10 +487,9 @@ public void testTruncateFailure() throws IOException {
public void testTruncateEditLogLoad() throws IOException {
int startingFileSize = 2 * BLOCK_SIZE + BLOCK_SIZE / 2;
int toTruncate = 1;
final String s = "/testTruncateEditLogLoad";
final Path p = new Path(s);
byte[] contents = AppendTestUtil.initBuffer(startingFileSize);
final Path p = new Path("/testTruncateEditLogLoad");
writeContents(contents, startingFileSize, p);
int newLength = startingFileSize - toTruncate;
@ -209,54 +497,183 @@ public void testTruncateEditLogLoad() throws IOException {
assertThat("truncate should have triggered block recovery.",
isReady, is(false));
checkBlockRecovery(p);
cluster.restartNameNode();
String holder = UserGroupInformation.getCurrentUser().getUserName();
cluster.getNamesystem().recoverLease(s, holder, "");
checkBlockRecovery(p);
FileStatus fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLength));
AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString());
checkFullFile(p, newLength, contents);
fs.delete(p, false);
}
/**
* Upgrade, RollBack, and restart test for Truncate.
*/
@Test
public void testUpgradeAndRestart() throws IOException {
Path parent = new Path("/test");
fs.mkdirs(parent);
fs.setQuota(parent, 100, 1000);
fs.allowSnapshot(parent);
String truncateFile = "testUpgrade";
final Path p = new Path(parent, truncateFile);
int startingFileSize = 2 * BLOCK_SIZE;
int toTruncate = 1;
byte[] contents = AppendTestUtil.initBuffer(startingFileSize);
writeContents(contents, startingFileSize, p);
Path snapshotDir = fs.createSnapshot(parent, "ss0");
Path snapshotFile = new Path(snapshotDir, truncateFile);
int newLengthBeforeUpgrade = startingFileSize - toTruncate;
boolean isReady = fs.truncate(p, newLengthBeforeUpgrade);
assertThat("truncate should have triggered block recovery.",
isReady, is(false));
checkBlockRecovery(p);
checkFullFile(p, newLengthBeforeUpgrade, contents);
assertFileLength(snapshotFile, startingFileSize);
long totalBlockBefore = cluster.getNamesystem().getBlocksTotal();
restartCluster(StartupOption.UPGRADE);
assertThat("SafeMode should be OFF",
cluster.getNamesystem().isInSafeMode(), is(false));
assertThat("NameNode should be performing upgrade.",
cluster.getNamesystem().isUpgradeFinalized(), is(false));
FileStatus fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLengthBeforeUpgrade));
int newLengthAfterUpgrade = newLengthBeforeUpgrade - toTruncate;
Block oldBlk = getLocatedBlocks(p).getLastLocatedBlock()
.getBlock().getLocalBlock();
isReady = fs.truncate(p, newLengthAfterUpgrade);
assertThat("truncate should have triggered block recovery.",
isReady, is(false));
fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLengthAfterUpgrade));
assertThat("Should copy on truncate during upgrade",
getLocatedBlocks(p).getLastLocatedBlock().getBlock()
.getLocalBlock().getBlockId(), is(not(equalTo(oldBlk.getBlockId()))));
checkBlockRecovery(p);
checkFullFile(p, newLengthAfterUpgrade, contents);
assertThat("Total block count should be unchanged from copy-on-truncate",
cluster.getNamesystem().getBlocksTotal(), is(totalBlockBefore));
restartCluster(StartupOption.ROLLBACK);
assertThat("File does not exist " + p, fs.exists(p), is(true));
fileStatus = fs.getFileStatus(p);
assertThat(fileStatus.getLen(), is((long) newLengthBeforeUpgrade));
checkFullFile(p, newLengthBeforeUpgrade, contents);
assertThat("Total block count should be unchanged from rolling back",
cluster.getNamesystem().getBlocksTotal(), is(totalBlockBefore));
restartCluster(StartupOption.REGULAR);
assertThat("Total block count should be unchanged from start-up",
cluster.getNamesystem().getBlocksTotal(), is(totalBlockBefore));
checkFullFile(p, newLengthBeforeUpgrade, contents);
assertFileLength(snapshotFile, startingFileSize);
// empty edits and restart
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
cluster.restartNameNode(true);
assertThat("Total block count should be unchanged from start-up",
cluster.getNamesystem().getBlocksTotal(), is(totalBlockBefore));
checkFullFile(p, newLengthBeforeUpgrade, contents);
assertFileLength(snapshotFile, startingFileSize);
fs.deleteSnapshot(parent, "ss0");
fs.delete(parent, true);
assertThat("File " + p + " shouldn't exist", fs.exists(p), is(false));
}
/**
* Check truncate recovery.
*/
@Test
public void testTruncateLastBlock() throws IOException {
public void testTruncateRecovery() throws IOException {
FSNamesystem fsn = cluster.getNamesystem();
String src = "/file";
String client = "client";
String clientMachine = "clientMachine";
Path parent = new Path("/test");
String src = "/test/testTruncateRecovery";
Path srcPath = new Path(src);
byte[] contents = AppendTestUtil.initBuffer(BLOCK_SIZE);
writeContents(contents, BLOCK_SIZE, srcPath);
INodeFile inode = fsn.getFSDirectory().getINode(src).asFile();
long oldGenstamp = GenerationStamp.LAST_RESERVED_STAMP;
DatanodeDescriptor dn = DFSTestUtil.getLocalDatanodeDescriptor();
DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo(
dn.getDatanodeUuid(), InetAddress.getLocalHost().getHostAddress());
dn.isAlive = true;
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 1, oldGenstamp), (short) 1,
HdfsServerConstants.BlockUCState.BEING_TRUNCATED,
new DatanodeStorageInfo[] {storage});
inode.setBlocks(new BlockInfo[] {blockInfo});
INodesInPath iip = fsn.getFSDirectory().getINodesInPath4Write(src, true);
INodeFile file = iip.getLastINode().asFile();
long initialGenStamp = file.getLastBlock().getGenerationStamp();
// Test that prepareFileForTruncate sets up in-place truncate.
fsn.writeLock();
try {
fsn.initializeBlockRecovery(inode);
assertThat(inode.getLastBlock().getBlockUCState(),
is(HdfsServerConstants.BlockUCState.BEING_TRUNCATED));
long blockRecoveryId = ((BlockInfoUnderConstruction) inode.getLastBlock())
Block oldBlock = file.getLastBlock();
Block truncateBlock =
fsn.prepareFileForTruncate(iip, client, clientMachine, 1, null);
// In-place truncate uses old block id with new genStamp.
assertThat(truncateBlock.getBlockId(),
is(equalTo(oldBlock.getBlockId())));
assertThat(truncateBlock.getNumBytes(),
is(oldBlock.getNumBytes()));
assertThat(truncateBlock.getGenerationStamp(),
is(fsn.getBlockIdManager().getGenerationStampV2()));
assertThat(file.getLastBlock().getBlockUCState(),
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
long blockRecoveryId = ((BlockInfoUnderConstruction) file.getLastBlock())
.getBlockRecoveryId();
assertThat(blockRecoveryId, is(oldGenstamp + 2));
assertThat(blockRecoveryId, is(initialGenStamp + 1));
fsn.getEditLog().logTruncate(
src, client, clientMachine, BLOCK_SIZE-1, Time.now(), truncateBlock);
} finally {
fsn.writeUnlock();
}
// Re-create file and ensure we are ready to copy on truncate
writeContents(contents, BLOCK_SIZE, srcPath);
fs.allowSnapshot(parent);
fs.createSnapshot(parent, "ss0");
iip = fsn.getFSDirectory().getINodesInPath(src, true);
file = iip.getLastINode().asFile();
file.recordModification(iip.getLatestSnapshotId(), true);
assertThat(file.isBlockInLatestSnapshot(file.getLastBlock()), is(true));
initialGenStamp = file.getLastBlock().getGenerationStamp();
// Test that prepareFileForTruncate sets up copy-on-write truncate
fsn.writeLock();
try {
Block oldBlock = file.getLastBlock();
Block truncateBlock =
fsn.prepareFileForTruncate(iip, client, clientMachine, 1, null);
// Copy-on-write truncate makes new block with new id and genStamp
assertThat(truncateBlock.getBlockId(),
is(not(equalTo(oldBlock.getBlockId()))));
assertThat(truncateBlock.getNumBytes() < oldBlock.getNumBytes(),
is(true));
assertThat(truncateBlock.getGenerationStamp(),
is(fsn.getBlockIdManager().getGenerationStampV2()));
assertThat(file.getLastBlock().getBlockUCState(),
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
long blockRecoveryId = ((BlockInfoUnderConstruction) file.getLastBlock())
.getBlockRecoveryId();
assertThat(blockRecoveryId, is(initialGenStamp + 1));
fsn.getEditLog().logTruncate(
src, client, clientMachine, BLOCK_SIZE-1, Time.now(), truncateBlock);
} finally {
fsn.writeUnlock();
}
checkBlockRecovery(srcPath);
fs.deleteSnapshot(parent, "ss0");
fs.delete(parent, true);
}
static void writeContents(byte[] contents, int fileLength, Path p)
@ -286,4 +703,38 @@ static void checkBlockRecovery(Path p) throws IOException {
static LocatedBlocks getLocatedBlocks(Path src) throws IOException {
return fs.getClient().getLocatedBlocks(src.toString(), 0, Long.MAX_VALUE);
}
static void assertBlockExists(Block blk) {
assertNotNull("BlocksMap does not contain block: " + blk,
cluster.getNamesystem().getStoredBlock(blk));
}
static void assertBlockNotPresent(Block blk) {
assertNull("BlocksMap should not contain block: " + blk,
cluster.getNamesystem().getStoredBlock(blk));
}
static void assertFileLength(Path file, long length) throws IOException {
byte[] data = DFSTestUtil.readFileBuffer(fs, file);
assertEquals("Wrong data size in snapshot.", length, data.length);
}
static void checkFullFile(Path p, int newLength, byte[] contents)
throws IOException {
AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString());
}
static void restartCluster(StartupOption o)
throws IOException {
cluster.shutdown();
if(StartupOption.ROLLBACK == o)
NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM)
.format(false)
.nameNodePort(NameNode.DEFAULT_PORT)
.startupOption(o==StartupOption.ROLLBACK ? StartupOption.REGULAR : o)
.dnStartupOption(o!=StartupOption.ROLLBACK ? StartupOption.REGULAR : o)
.build();
fs = cluster.getFileSystem();
}
}

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<EDITS>
<EDITS_VERSION>-60</EDITS_VERSION>
<EDITS_VERSION>-61</EDITS_VERSION>
<RECORD>
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
<DATA>
@ -13,8 +13,8 @@
<TXID>2</TXID>
<DELEGATION_KEY>
<KEY_ID>1</KEY_ID>
<EXPIRY_DATE>1421822547136</EXPIRY_DATE>
<KEY>24319c7d1f7c0828</KEY>
<EXPIRY_DATE>1421826999207</EXPIRY_DATE>
<KEY>ca9a0c8b240570b3</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -24,8 +24,8 @@
<TXID>3</TXID>
<DELEGATION_KEY>
<KEY_ID>2</KEY_ID>
<EXPIRY_DATE>1421822547140</EXPIRY_DATE>
<KEY>254b1207021431f4</KEY>
<EXPIRY_DATE>1421826999210</EXPIRY_DATE>
<KEY>833c25a6fb2b0a6f</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -37,19 +37,19 @@
<INODEID>16386</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131348286</MTIME>
<ATIME>1421131348286</ATIME>
<MTIME>1421135800328</MTIME>
<ATIME>1421135800328</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_526346936_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>6</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>9</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -60,14 +60,14 @@
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131348328</MTIME>
<ATIME>1421131348286</ATIME>
<MTIME>1421135800357</MTIME>
<ATIME>1421135800328</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<OVERWRITE>false</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -88,9 +88,9 @@
<LENGTH>0</LENGTH>
<SRC>/file_create</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1421131348343</TIMESTAMP>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>9</RPC_CALLID>
<TIMESTAMP>1421135800368</TIMESTAMP>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>12</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -99,9 +99,9 @@
<TXID>8</TXID>
<LENGTH>0</LENGTH>
<PATH>/file_moved</PATH>
<TIMESTAMP>1421131348353</TIMESTAMP>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>10</RPC_CALLID>
<TIMESTAMP>1421135800377</TIMESTAMP>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>13</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -111,9 +111,9 @@
<LENGTH>0</LENGTH>
<INODEID>16387</INODEID>
<PATH>/directory_mkdir</PATH>
<TIMESTAMP>1421131348366</TIMESTAMP>
<TIMESTAMP>1421135800394</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
@ -146,8 +146,8 @@
<TXID>13</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot1</SNAPSHOTNAME>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>15</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>18</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -157,8 +157,8 @@
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME>
<SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>19</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -167,8 +167,8 @@
<TXID>15</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot2</SNAPSHOTNAME>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>20</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -179,19 +179,19 @@
<INODEID>16388</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131348401</MTIME>
<ATIME>1421131348401</ATIME>
<MTIME>1421135800442</MTIME>
<ATIME>1421135800442</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_526346936_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>18</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>21</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -202,14 +202,14 @@
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131348405</MTIME>
<ATIME>1421131348401</ATIME>
<MTIME>1421135800445</MTIME>
<ATIME>1421135800442</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<OVERWRITE>false</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -265,10 +265,10 @@
<LENGTH>0</LENGTH>
<SRC>/file_create</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1421131348436</TIMESTAMP>
<TIMESTAMP>1421135800485</TIMESTAMP>
<OPTIONS>NONE</OPTIONS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>25</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>28</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -279,19 +279,19 @@
<INODEID>16389</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131348443</MTIME>
<ATIME>1421131348443</ATIME>
<MTIME>1421135800495</MTIME>
<ATIME>1421135800495</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_526346936_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>27</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>30</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -396,8 +396,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131348998</MTIME>
<ATIME>1421131348443</ATIME>
<MTIME>1421135801050</MTIME>
<ATIME>1421135800495</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -418,7 +418,7 @@
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -432,19 +432,19 @@
<INODEID>16390</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131349001</MTIME>
<ATIME>1421131349001</ATIME>
<MTIME>1421135801053</MTIME>
<ATIME>1421135801053</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_526346936_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>38</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>41</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -549,8 +549,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131349032</MTIME>
<ATIME>1421131349001</ATIME>
<MTIME>1421135801091</MTIME>
<ATIME>1421135801053</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -571,7 +571,7 @@
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -585,19 +585,19 @@
<INODEID>16391</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131349036</MTIME>
<ATIME>1421131349036</ATIME>
<MTIME>1421135801095</MTIME>
<ATIME>1421135801095</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_526346936_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>47</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>50</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -702,8 +702,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131349060</MTIME>
<ATIME>1421131349036</ATIME>
<MTIME>1421135801126</MTIME>
<ATIME>1421135801095</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -724,7 +724,7 @@
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -736,13 +736,13 @@
<TXID>57</TXID>
<LENGTH>0</LENGTH>
<TRG>/file_concat_target</TRG>
<TIMESTAMP>1421131349064</TIMESTAMP>
<TIMESTAMP>1421135801130</TIMESTAMP>
<SOURCES>
<SOURCE1>/file_concat_0</SOURCE1>
<SOURCE2>/file_concat_1</SOURCE2>
</SOURCES>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>55</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>58</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -753,19 +753,19 @@
<INODEID>16392</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131349068</MTIME>
<ATIME>1421131349068</ATIME>
<MTIME>1421135810102</MTIME>
<ATIME>1421135810102</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_526346936_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>57</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>63</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -837,8 +837,8 @@
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131349085</MTIME>
<ATIME>1421131349068</ATIME>
<MTIME>1421135810122</MTIME>
<ATIME>1421135810102</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -854,7 +854,7 @@
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -865,10 +865,10 @@
<DATA>
<TXID>66</TXID>
<SRC>/file_create</SRC>
<CLIENTNAME>DFSClient_NONMAPREDUCE_526346936_1</CLIENTNAME>
<CLIENTNAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENTNAME>
<CLIENTMACHINE>127.0.0.1</CLIENTMACHINE>
<NEWLENGTH>512</NEWLENGTH>
<TIMESTAMP>1421131349088</TIMESTAMP>
<TIMESTAMP>1421135810125</TIMESTAMP>
</DATA>
</RECORD>
<RECORD>
@ -879,15 +879,15 @@
<INODEID>16393</INODEID>
<PATH>/file_symlink</PATH>
<VALUE>/file_concat_target</VALUE>
<MTIME>1421131349095</MTIME>
<ATIME>1421131349095</ATIME>
<MTIME>1421135810132</MTIME>
<ATIME>1421135810132</ATIME>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>511</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>70</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -898,19 +898,19 @@
<INODEID>16394</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131349098</MTIME>
<ATIME>1421131349098</ATIME>
<MTIME>1421135810135</MTIME>
<ATIME>1421135810135</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_526346936_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>65</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>71</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -966,7 +966,7 @@
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
<TXID>74</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_526346936_1</LEASEHOLDER>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_240777107_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
</DATA>
@ -979,8 +979,8 @@
<INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1421131351230</MTIME>
<ATIME>1421131349098</ATIME>
<MTIME>1421135812235</MTIME>
<ATIME>1421135810135</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -991,7 +991,7 @@
<GENSTAMP>1013</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>plamenjeliazkov</USERNAME>
<USERNAME>shv</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -1002,13 +1002,13 @@
<DATA>
<TXID>76</TXID>
<POOLNAME>pool1</POOLNAME>
<OWNERNAME>plamenjeliazkov</OWNERNAME>
<GROUPNAME>staff</GROUPNAME>
<OWNERNAME>shv</OWNERNAME>
<GROUPNAME>shv</GROUPNAME>
<MODE>493</MODE>
<LIMIT>9223372036854775807</LIMIT>
<MAXRELATIVEEXPIRY>2305843009213693951</MAXRELATIVEEXPIRY>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>72</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>78</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -1017,8 +1017,8 @@
<TXID>77</TXID>
<POOLNAME>pool1</POOLNAME>
<LIMIT>99</LIMIT>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>73</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>79</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -1029,9 +1029,9 @@
<PATH>/path</PATH>
<REPLICATION>1</REPLICATION>
<POOL>pool1</POOL>
<EXPIRATION>2305844430345046085</EXPIRATION>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>74</RPC_CALLID>
<EXPIRATION>2305844430349507141</EXPIRATION>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>80</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -1040,8 +1040,8 @@
<TXID>79</TXID>
<ID>1</ID>
<REPLICATION>2</REPLICATION>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>75</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>81</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -1049,8 +1049,8 @@
<DATA>
<TXID>80</TXID>
<ID>1</ID>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>76</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>82</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -1058,8 +1058,8 @@
<DATA>
<TXID>81</TXID>
<POOLNAME>pool1</POOLNAME>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>77</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>83</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -1105,8 +1105,8 @@
<NAME>a1</NAME>
<VALUE>0x313233</VALUE>
</XATTR>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>79</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>85</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -1119,8 +1119,8 @@
<NAME>a2</NAME>
<VALUE>0x373839</VALUE>
</XATTR>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>80</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>86</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -1132,22 +1132,22 @@
<NAMESPACE>USER</NAMESPACE>
<NAME>a2</NAME>
</XATTR>
<RPC_CLIENTID>99bcddc1-3460-4630-9904-6c7ca5811945</RPC_CLIENTID>
<RPC_CALLID>81</RPC_CALLID>
<RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
<RPC_CALLID>87</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_START</OPCODE>
<DATA>
<TXID>86</TXID>
<STARTTIME>1421131352186</STARTTIME>
<STARTTIME>1421135813268</STARTTIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_FINALIZE</OPCODE>
<DATA>
<TXID>87</TXID>
<FINALIZETIME>1421131352186</FINALIZETIME>
<FINALIZETIME>1421135813268</FINALIZETIME>
</DATA>
</RECORD>
<RECORD>