diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6da066784ec..c1c01d517b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -20,6 +20,8 @@ Trunk (Unreleased) HDFS-3107. Introduce truncate. (Plamen Jeliazkov via shv) + HDFS-7056. Snapshot support for truncate. (Plamen Jeliazkov and shv) + IMPROVEMENTS HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 749f3876df3..cfd1c67782e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -537,7 +537,7 @@ public interface ClientProtocol { * @param src existing file * @param newLength the target size * - * @return true if and client does not need to wait for block recovery, + * @return true if client does not need to wait for block recovery, * false if client needs to wait for block recovery. * * @throws AccessControlException If access is denied diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java index 087c697c587..ba0a8fc24ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java @@ -76,12 +76,12 @@ public class InterDatanodeProtocolServerSideTranslatorPB implements final String storageID; try { storageID = impl.updateReplicaUnderRecovery( - PBHelper.convert(request.getBlock()), - request.getRecoveryId(), request.getNewLength()); + PBHelper.convert(request.getBlock()), request.getRecoveryId(), + request.getNewBlockId(), request.getNewLength()); } catch (IOException e) { throw new ServiceException(e); } return UpdateReplicaUnderRecoveryResponseProto.newBuilder() .setStorageUuid(storageID).build(); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java index 5174d861882..fee62a4e994 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -102,11 +102,12 @@ public class InterDatanodeProtocolTranslatorPB implements @Override public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, - long recoveryId, long newLength) throws IOException { + long recoveryId, long newBlockId, long newLength) throws IOException { UpdateReplicaUnderRecoveryRequestProto req = UpdateReplicaUnderRecoveryRequestProto.newBuilder() .setBlock(PBHelper.convert(oldBlock)) - .setNewLength(newLength).setRecoveryId(recoveryId).build(); + .setNewLength(newLength).setNewBlockId(newBlockId) + .setRecoveryId(recoveryId).build(); try { return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req ).getStorageUuid(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 3f6a7f3694c..7187838f6d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -607,16 +607,19 @@ public class PBHelper { return null; } LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b); - return RecoveringBlockProto.newBuilder().setBlock(lb) - .setNewGenStamp(b.getNewGenerationStamp()) - .setTruncateFlag(b.getTruncateFlag()).build(); + RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder(); + builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp()); + if(b.getNewBlock() != null) + builder.setTruncateBlock(PBHelper.convert(b.getNewBlock())); + return builder.build(); } public static RecoveringBlock convert(RecoveringBlockProto b) { ExtendedBlock block = convert(b.getBlock().getB()); DatanodeInfo[] locs = convert(b.getBlock().getLocsList()); - return new RecoveringBlock(block, locs, b.getNewGenStamp(), - b.getTruncateFlag()); + return (b.hasTruncateBlock()) ? + new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) : + new RecoveringBlock(block, locs, b.getNewGenStamp()); } public static DatanodeInfoProto.AdminState convert( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java index 28b179db1c0..8a811ba19cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -54,6 +54,11 @@ public class BlockInfoUnderConstruction extends BlockInfo { */ private long blockRecoveryId = 0; + /** + * The block source to use in the event of copy-on-write truncate. + */ + private Block truncateBlock; + /** * ReplicaUnderConstruction contains information about replicas while * they are under construction. @@ -229,6 +234,15 @@ public class BlockInfoUnderConstruction extends BlockInfo { return blockRecoveryId; } + /** Get recover block */ + public Block getTruncateBlock() { + return truncateBlock; + } + + public void setTruncateBlock(Block recoveryBlock) { + this.truncateBlock = recoveryBlock; + } + /** * Process the recorded replicas. When about to commit or finish the * pipeline recovery sort out bad replicas. @@ -273,11 +287,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { * make it primary. */ public void initializeBlockRecovery(long recoveryId) { - initializeBlockRecovery(BlockUCState.UNDER_RECOVERY, recoveryId); - } - - public void initializeBlockRecovery(BlockUCState s, long recoveryId) { - setBlockUCState(s); + setBlockUCState(BlockUCState.UNDER_RECOVERY); blockRecoveryId = recoveryId; if (replicas.size() == 0) { NameNode.blockStateChangeLog.warn("BLOCK*" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7e54c0f9c36..03861070e33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -700,13 +700,14 @@ public class BlockManager { * The client is supposed to allocate a new block with the next call. * * @param bc file + * @param bytesToRemove num of bytes to remove from block * @return the last block locations if the block is partial or null otherwise */ public LocatedBlock convertLastBlockToUnderConstruction( - BlockCollection bc) throws IOException { + BlockCollection bc, long bytesToRemove) throws IOException { BlockInfo oldBlock = bc.getLastBlock(); if(oldBlock == null || - bc.getPreferredBlockSize() == oldBlock.getNumBytes()) + bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove) return null; assert oldBlock == getStoredBlock(oldBlock) : "last block of the file is not in blocksMap"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 918b8d99a78..7ef65215195 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -1433,26 +1432,37 @@ public class DatanodeManager { recoveryLocations.add(storages[i]); } } + // If we are performing a truncate recovery than set recovery fields + // to old block. + boolean truncateRecovery = b.getTruncateBlock() != null; + boolean copyOnTruncateRecovery = truncateRecovery && + b.getTruncateBlock().getBlockId() != b.getBlockId(); + ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? + new ExtendedBlock(blockPoolId, b.getTruncateBlock()) : + new ExtendedBlock(blockPoolId, b); // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. + DatanodeInfo[] recoveryInfos; if (recoveryLocations.size() > 1) { if (recoveryLocations.size() != storages.length) { LOG.info("Skipped stale nodes for recovery : " + (storages.length - recoveryLocations.size())); } - boolean isTruncate = b.getBlockUCState().equals( - HdfsServerConstants.BlockUCState.BEING_TRUNCATED); - brCommand.add(new RecoveringBlock( - new ExtendedBlock(blockPoolId, b), - DatanodeStorageInfo.toDatanodeInfos(recoveryLocations), - b.getBlockRecoveryId(), isTruncate)); + recoveryInfos = + DatanodeStorageInfo.toDatanodeInfos(recoveryLocations); } else { // If too many replicas are stale, then choose all replicas to participate // in block recovery. - brCommand.add(new RecoveringBlock( - new ExtendedBlock(blockPoolId, b), - DatanodeStorageInfo.toDatanodeInfos(storages), - b.getBlockRecoveryId())); + recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); + } + if(truncateRecovery) { + Block recoveryBlock = (copyOnTruncateRecovery) ? b : + b.getTruncateBlock(); + brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, + recoveryBlock)); + } else { + brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, + b.getBlockRecoveryId())); } } return new DatanodeCommand[] { brCommand }; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index f2e7ff44e46..9bba2c950b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -299,13 +299,6 @@ public final class HdfsServerConstants { * which synchronizes the existing replicas contents. */ UNDER_RECOVERY, - /** - * The block is being truncated.
- * When a file is truncated its last block may need to be truncated - * and needs to go through a recovery procedure, - * which synchronizes the existing replicas contents. - */ - BEING_TRUNCATED, /** * The block is committed.
* The client reported that all bytes are written to data-nodes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 7f95f331500..84528e7eff2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2530,14 +2530,16 @@ public class DataNode extends ReconfigurableBase */ @Override // InterDatanodeProtocol public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock, - final long recoveryId, final long newLength) throws IOException { + final long recoveryId, final long newBlockId, final long newLength) + throws IOException { final String storageID = data.updateReplicaUnderRecovery(oldBlock, - recoveryId, newLength); + recoveryId, newBlockId, newLength); // Notify the namenode of the updated block info. This is important // for HA, since otherwise the standby node may lose track of the // block locations until the next block report. ExtendedBlock newBlock = new ExtendedBlock(oldBlock); newBlock.setGenerationStamp(recoveryId); + newBlock.setBlockId(newBlockId); newBlock.setNumBytes(newLength); notifyNamenodeReceivedBlock(newBlock, "", storageID); return storageID; @@ -2559,10 +2561,12 @@ public class DataNode extends ReconfigurableBase this.rInfo = rInfo; } - void updateReplicaUnderRecovery(String bpid, long recoveryId, long newLength - ) throws IOException { + void updateReplicaUnderRecovery(String bpid, long recoveryId, + long newBlockId, long newLength) + throws IOException { final ExtendedBlock b = new ExtendedBlock(bpid, rInfo); - storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newLength); + storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId, + newLength); } @Override @@ -2644,8 +2648,12 @@ public class DataNode extends ReconfigurableBase final String bpid = block.getBlockPoolId(); DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(block.getBlockPoolId()); - + long recoveryId = rBlock.getNewGenerationStamp(); + boolean isTruncateRecovery = rBlock.getNewBlock() != null; + long blockId = (isTruncateRecovery) ? + rBlock.getNewBlock().getBlockId() : block.getBlockId(); + if (LOG.isDebugEnabled()) { LOG.debug("block=" + block + ", (length=" + block.getNumBytes() + "), syncList=" + syncList); @@ -2679,7 +2687,7 @@ public class DataNode extends ReconfigurableBase // Calculate list of nodes that will participate in the recovery // and the new block size List participatingList = new ArrayList(); - final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(), + final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId, -1, recoveryId); switch(bestState) { case FINALIZED: @@ -2691,10 +2699,7 @@ public class DataNode extends ReconfigurableBase r.rInfo.getNumBytes() == finalizedLength) participatingList.add(r); } - if(rBlock.getTruncateFlag()) - newBlock.setNumBytes(rBlock.getBlock().getNumBytes()); - else - newBlock.setNumBytes(finalizedLength); + newBlock.setNumBytes(finalizedLength); break; case RBW: case RWR: @@ -2706,21 +2711,21 @@ public class DataNode extends ReconfigurableBase participatingList.add(r); } } - if(rBlock.getTruncateFlag()) - newBlock.setNumBytes(rBlock.getBlock().getNumBytes()); - else - newBlock.setNumBytes(minLength); + newBlock.setNumBytes(minLength); break; case RUR: case TEMPORARY: assert false : "bad replica state: " + bestState; } + if(isTruncateRecovery) + newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes()); List failedList = new ArrayList(); final List successList = new ArrayList(); for(BlockRecord r : participatingList) { try { - r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes()); + r.updateReplicaUnderRecovery(bpid, recoveryId, blockId, + newBlock.getNumBytes()); successList.add(r); } catch (IOException e) { InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 462ad31c03c..f2dddfd1a3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -418,7 +418,7 @@ public interface FsDatasetSpi extends FSDatasetMBean { * @return the ID of storage that stores the block */ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, - long recoveryId, long newLength) throws IOException; + long recoveryId, long newBlockId, long newLength) throws IOException; /** * add new block pool ID diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 3eda38e703c..e62986fa310 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -670,6 +670,12 @@ class FsDatasetImpl implements FsDatasetSpi { final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File dstFile = new File(destDir, srcFile.getName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); + return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum); + } + + static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, + File dstFile, boolean calculateChecksum) + throws IOException { if (calculateChecksum) { computeChecksum(srcMeta, dstMeta, srcFile); } else { @@ -2157,6 +2163,7 @@ class FsDatasetImpl implements FsDatasetSpi { public synchronized String updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, + final long newBlockId, final long newlength) throws IOException { //get replica final String bpid = oldBlock.getBlockPoolId(); @@ -2189,13 +2196,26 @@ class FsDatasetImpl implements FsDatasetSpi { //update replica final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock - .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength); - assert finalized.getBlockId() == oldBlock.getBlockId() - && finalized.getGenerationStamp() == recoveryId - && finalized.getNumBytes() == newlength - : "Replica information mismatched: oldBlock=" + oldBlock - + ", recoveryId=" + recoveryId + ", newlength=" + newlength - + ", finalized=" + finalized; + .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, + newBlockId, newlength); + + boolean copyTruncate = newBlockId != oldBlock.getBlockId(); + if(!copyTruncate) { + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == recoveryId + && finalized.getNumBytes() == newlength + : "Replica information mismatched: oldBlock=" + oldBlock + + ", recoveryId=" + recoveryId + ", newlength=" + newlength + + ", newBlockId=" + newBlockId + ", finalized=" + finalized; + } else { + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == oldBlock.getGenerationStamp() + && finalized.getNumBytes() == oldBlock.getNumBytes() + : "Finalized and old information mismatched: oldBlock=" + oldBlock + + ", genStamp=" + oldBlock.getGenerationStamp() + + ", len=" + oldBlock.getNumBytes() + + ", finalized=" + finalized; + } //check replica files after update checkReplicaFiles(finalized); @@ -2208,6 +2228,7 @@ class FsDatasetImpl implements FsDatasetSpi { String bpid, ReplicaUnderRecovery rur, long recoveryId, + long newBlockId, long newlength) throws IOException { //check recovery id if (rur.getRecoveryID() != recoveryId) { @@ -2215,26 +2236,62 @@ class FsDatasetImpl implements FsDatasetSpi { + ", rur=" + rur); } + boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId; + File blockFile; + File metaFile; // bump rur's GS to be recovery id - bumpReplicaGS(rur, recoveryId); + if(!copyOnTruncate) { + bumpReplicaGS(rur, recoveryId); + blockFile = rur.getBlockFile(); + metaFile = rur.getMetaFile(); + } else { + File[] copiedReplicaFiles = + copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); + blockFile = copiedReplicaFiles[1]; + metaFile = copiedReplicaFiles[0]; + } //update length - final File replicafile = rur.getBlockFile(); if (rur.getNumBytes() < newlength) { throw new IOException("rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { rur.unlinkBlock(1); - truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength); - // update RUR with the new length - rur.setNumBytes(newlength); + truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); + if(!copyOnTruncate) { + // update RUR with the new length + rur.setNumBytes(newlength); + } else { + // Copying block to a new block with new blockId. + // Not truncating original block. + ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( + newBlockId, recoveryId, rur.getVolume(), blockFile.getParentFile(), + newlength); + newReplicaInfo.setNumBytes(newlength); + volumeMap.add(bpid, newReplicaInfo); + finalizeReplica(bpid, newReplicaInfo); + } } // finalize the block return finalizeReplica(bpid, rur); } + private File[] copyReplicaWithNewBlockIdAndGS( + ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS) + throws IOException { + String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId; + FsVolumeImpl v = volumes.getNextVolume( + replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes()); + final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir(); + final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); + final File dstBlockFile = new File(destDir, blockFileName); + final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); + return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), + dstMetaFile, dstBlockFile, true); + } + @Override // FsDatasetSpi public synchronized long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index dc0fe1f5f00..cb3da193e8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -424,7 +424,7 @@ class FSDirStatAndListingOp { fileNode.computeFileSizeNotIncludingLastUcBlock() : size; loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks( - fileNode.getBlocks(), fileSize, isUc, 0L, size, false, + fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false, inSnapshot, feInfo); if (loc == null) { loc = new LocatedBlocks(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 1948099fe9a..9c33e06bcb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -1093,18 +1093,31 @@ public class FSDirectory implements Closeable { * Unlike FSNamesystem.truncate, this will not schedule block recovery. */ void unprotectedTruncate(String src, String clientName, String clientMachine, - long newLength, long mtime) + long newLength, long mtime, Block truncateBlock) throws UnresolvedLinkException, QuotaExceededException, SnapshotAccessControlException, IOException { INodesInPath iip = getINodesInPath(src, true); + INodeFile file = iip.getLastINode().asFile(); BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); boolean onBlockBoundary = unprotectedTruncate(iip, newLength, collectedBlocks, mtime); if(! onBlockBoundary) { - getFSNamesystem().prepareFileForWrite(src, - iip, clientName, clientMachine, false, false); + BlockInfo oldBlock = file.getLastBlock(); + Block tBlk = + getFSNamesystem().prepareFileForTruncate(iip, + clientName, clientMachine, file.computeFileSize() - newLength, + truncateBlock); + assert Block.matchingIdAndGenStamp(tBlk, truncateBlock) && + tBlk.getNumBytes() == truncateBlock.getNumBytes() : + "Should be the same block."; + if(oldBlock.getBlockId() != tBlk.getBlockId() && + !file.isBlockInLatestSnapshot(oldBlock)) { + getBlockManager().removeBlockFromMap(oldBlock); + } } + assert onBlockBoundary == (truncateBlock == null) : + "truncateBlock is null iff on block boundary: " + truncateBlock; getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks); } @@ -1123,7 +1136,8 @@ public class FSDirectory implements Closeable { /** * Truncate has the following properties: * 1.) Any block deletions occur now. - * 2.) INode length is truncated now – clients can only read up to new length. + * 2.) INode length is truncated now – new clients can only read up to + * the truncated length. * 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY. * 4.) NN will trigger DN truncation recovery and waits for DNs to report. * 5.) File is considered UNDER_RECOVERY until truncation recovery completes. @@ -1136,20 +1150,16 @@ public class FSDirectory implements Closeable { long mtime) throws IOException { assert hasWriteLock(); INodeFile file = iip.getLastINode().asFile(); + int latestSnapshot = iip.getLatestSnapshotId(); + file.recordModification(latestSnapshot, true); long oldDiskspace = file.diskspaceConsumed(); long remainingLength = file.collectBlocksBeyondMax(newLength, collectedBlocks); + file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks); file.setModificationTime(mtime); updateCount(iip, 0, file.diskspaceConsumed() - oldDiskspace, true); - // If on block boundary, then return - long lastBlockDelta = remainingLength - newLength; - if(lastBlockDelta == 0) - return true; - // Set new last block length - BlockInfo lastBlock = file.getLastBlock(); - assert lastBlock.getNumBytes() - lastBlockDelta > 0 : "wrong block size"; - lastBlock.setNumBytes(lastBlock.getNumBytes() - lastBlockDelta); - return false; + // return whether on a block boundary + return (remainingLength - newLength) == 0; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index d32aad933a5..144be37f8ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -902,13 +903,14 @@ public class FSEditLog implements LogsPurgeable { * Add truncate file record to edit log */ void logTruncate(String src, String clientName, String clientMachine, - long size, long timestamp) { + long size, long timestamp, Block truncateBlock) { TruncateOp op = TruncateOp.getInstance(cache.get()) .setPath(src) .setClientName(clientName) .setClientMachine(clientMachine) .setNewLength(size) - .setTimestamp(timestamp); + .setTimestamp(timestamp) + .setTruncateBlock(truncateBlock); logEdit(op); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 2ff3b772d2c..1f4d1a68949 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -857,7 +857,8 @@ public class FSEditLogLoader { case OP_TRUNCATE: { TruncateOp truncateOp = (TruncateOp) op; fsDir.unprotectedTruncate(truncateOp.src, truncateOp.clientName, - truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp); + truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp, + truncateOp.truncateBlock); break; } case OP_SET_STORAGE_POLICY: { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 396fb08de64..94241566fc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -2611,6 +2611,7 @@ public abstract class FSEditLogOp { String clientMachine; long newLength; long timestamp; + Block truncateBlock; private TruncateOp() { super(OP_TRUNCATE); @@ -2654,6 +2655,11 @@ public abstract class FSEditLogOp { return this; } + TruncateOp setTruncateBlock(Block truncateBlock) { + this.truncateBlock = truncateBlock; + return this; + } + @Override void readFields(DataInputStream in, int logVersion) throws IOException { src = FSImageSerialization.readString(in); @@ -2661,6 +2667,10 @@ public abstract class FSEditLogOp { clientMachine = FSImageSerialization.readString(in); newLength = FSImageSerialization.readLong(in); timestamp = FSImageSerialization.readLong(in); + Block[] blocks = + FSImageSerialization.readCompactBlockArray(in, logVersion); + assert blocks.length <= 1 : "Truncate op should have 1 or 0 blocks"; + truncateBlock = (blocks.length == 0) ? null : blocks[0]; } @Override @@ -2670,6 +2680,12 @@ public abstract class FSEditLogOp { FSImageSerialization.writeString(clientMachine, out); FSImageSerialization.writeLong(newLength, out); FSImageSerialization.writeLong(timestamp, out); + int size = truncateBlock != null ? 1 : 0; + Block[] blocks = new Block[size]; + if (truncateBlock != null) { + blocks[0] = truncateBlock; + } + FSImageSerialization.writeCompactBlockArray(blocks, out); } @Override @@ -2681,6 +2697,8 @@ public abstract class FSEditLogOp { Long.toString(newLength)); XMLUtils.addSaxString(contentHandler, "TIMESTAMP", Long.toString(timestamp)); + if(truncateBlock != null) + FSEditLogOp.blockToXml(contentHandler, truncateBlock); } @Override @@ -2690,6 +2708,8 @@ public abstract class FSEditLogOp { this.clientMachine = st.getValue("CLIENTMACHINE"); this.newLength = Long.parseLong(st.getValue("NEWLENGTH")); this.timestamp = Long.parseLong(st.getValue("TIMESTAMP")); + if (st.hasChildren("BLOCK")) + this.truncateBlock = FSEditLogOp.blockFromXml(st); } @Override @@ -2705,6 +2725,8 @@ public abstract class FSEditLogOp { builder.append(newLength); builder.append(", timestamp="); builder.append(timestamp); + builder.append(", truncateBlock="); + builder.append(truncateBlock); builder.append(", opCode="); builder.append(opCode); builder.append(", txid="); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index c250838e48a..bcc0f5c1743 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1837,8 +1837,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip); final LocatedBlocks blocks = blockManager.createLocatedBlocks( - inode.getBlocks(), fileSize, isUc, offset, length, needBlockToken, - iip.isSnapshot(), feInfo); + inode.getBlocks(iip.getPathSnapshotId()), fileSize, + isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo); // Set caching information for the located blocks. for (LocatedBlock lb : blocks.getLocatedBlocks()) { @@ -1912,7 +1912,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * Truncation at block boundary is atomic, otherwise it requires * block recovery to truncate the last block of the file. * - * @return true if and client does not need to wait for block recovery, + * @return true if client does not need to wait for block recovery, * false if client needs to wait for block recovery. */ boolean truncate(String src, long newLength, @@ -1974,44 +1974,119 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, dir.checkPathAccess(pc, iip, FsAction.WRITE); } INodeFile file = iip.getLastINode().asFile(); - // Data will be lost after truncate occurs so it cannot support snapshots. - if(file.isInLatestSnapshot(iip.getLatestSnapshotId())) - throw new HadoopIllegalArgumentException( - "Cannot truncate file with snapshot."); // Opening an existing file for write. May need lease recovery. recoverLeaseInternal(iip, src, clientName, clientMachine, false); - // Refresh INode as the file could have been closed - iip = dir.getINodesInPath4Write(src, true); file = INodeFile.valueOf(iip.getLastINode(), src); // Truncate length check. long oldLength = file.computeFileSize(); - if(oldLength == newLength) + if(oldLength == newLength) { return true; - if(oldLength < newLength) + } + if(oldLength < newLength) { throw new HadoopIllegalArgumentException( "Cannot truncate to a larger file size. Current size: " + oldLength + ", truncate size: " + newLength + "."); + } // Perform INodeFile truncation. BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); boolean onBlockBoundary = dir.truncate(iip, newLength, collectedBlocks, mtime); - + Block truncateBlock = null; if(! onBlockBoundary) { // Open file for write, but don't log into edits - prepareFileForWrite(src, iip, clientName, clientMachine, false, false); - file = INodeFile.valueOf(dir.getINode4Write(src), src); - initializeBlockRecovery(file); + long lastBlockDelta = file.computeFileSize() - newLength; + assert lastBlockDelta > 0 : "delta is 0 only if on block bounday"; + truncateBlock = prepareFileForTruncate(iip, clientName, clientMachine, + lastBlockDelta, null); } - getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime); + getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime, + truncateBlock); removeBlocks(collectedBlocks); return onBlockBoundary; } - void initializeBlockRecovery(INodeFile inodeFile) throws IOException { - BlockInfo lastBlock = inodeFile.getLastBlock(); - long recoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(lastBlock)); - ((BlockInfoUnderConstruction)lastBlock).initializeBlockRecovery( - BlockUCState.BEING_TRUNCATED, recoveryId); + /** + * Convert current INode to UnderConstruction. + * Recreate lease. + * Create new block for the truncated copy. + * Schedule truncation of the replicas. + * + * @return the returned block will be written to editLog and passed back into + * this method upon loading. + */ + Block prepareFileForTruncate(INodesInPath iip, + String leaseHolder, + String clientMachine, + long lastBlockDelta, + Block newBlock) + throws IOException { + INodeFile file = iip.getLastINode().asFile(); + String src = iip.getPath(); + file.recordModification(iip.getLatestSnapshotId()); + file.toUnderConstruction(leaseHolder, clientMachine); + assert file.isUnderConstruction() : "inode should be under construction."; + leaseManager.addLease( + file.getFileUnderConstructionFeature().getClientName(), src); + boolean shouldRecoverNow = (newBlock == null); + BlockInfo oldBlock = file.getLastBlock(); + boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, oldBlock); + if(newBlock == null) { + newBlock = (shouldCopyOnTruncate) ? createNewBlock() : + new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(), + nextGenerationStamp(blockIdManager.isLegacyBlock(oldBlock))); + } + + BlockInfoUnderConstruction truncatedBlockUC; + if(shouldCopyOnTruncate) { + // Add new truncateBlock into blocksMap and + // use oldBlock as a source for copy-on-truncate recovery + truncatedBlockUC = new BlockInfoUnderConstruction(newBlock, + file.getBlockReplication()); + truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); + truncatedBlockUC.setTruncateBlock(oldBlock); + file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock)); + getBlockManager().addBlockCollection(truncatedBlockUC, file); + + NameNode.stateChangeLog.info("BLOCK* prepareFileForTruncate: " + + "Scheduling copy-on-truncate to new size " + + truncatedBlockUC.getNumBytes() + " new block " + newBlock + + " old block " + truncatedBlockUC.getTruncateBlock()); + } else { + // Use new generation stamp for in-place truncate recovery + blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta); + oldBlock = file.getLastBlock(); + assert !oldBlock.isComplete() : "oldBlock should be under construction"; + truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock; + truncatedBlockUC.setTruncateBlock(new Block(oldBlock)); + truncatedBlockUC.getTruncateBlock().setNumBytes( + oldBlock.getNumBytes() - lastBlockDelta); + truncatedBlockUC.getTruncateBlock().setGenerationStamp( + newBlock.getGenerationStamp()); + + NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: " + + "Scheduling in-place block truncate to new size " + + truncatedBlockUC.getTruncateBlock().getNumBytes() + + " block=" + truncatedBlockUC); + } + if(shouldRecoverNow) + truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp()); + + // update the quota: use the preferred block size for UC block + final long diff = + file.getPreferredBlockSize() - truncatedBlockUC.getNumBytes(); + dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication()); + return newBlock; + } + + /** + * Defines if a replica needs to be copied on truncate or + * can be truncated in place. + */ + boolean shouldCopyOnTruncate(INodeFile file, BlockInfo blk) { + if(!isUpgradeFinalized()) { + return true; + } + return file.isBlockInLatestSnapshot(blk); } /** @@ -2598,7 +2673,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, leaseManager.addLease( file.getFileUnderConstructionFeature().getClientName(), src); - LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(file); + LocatedBlock ret = + blockManager.convertLastBlockToUnderConstruction(file, 0); if (ret != null) { // update the quota: use the preferred block size for UC block final long diff = file.getPreferredBlockSize() - ret.getBlockSize(); @@ -2661,7 +2737,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return false; } - private void recoverLeaseInternal(INodesInPath iip, + void recoverLeaseInternal(INodesInPath iip, String src, String holder, String clientMachine, boolean force) throws IOException { assert hasWriteLock(); @@ -2723,8 +2799,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } else { final BlockInfo lastBlock = file.getLastBlock(); if (lastBlock != null - && (lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY || - lastBlock.getBlockUCState() == BlockUCState.BEING_TRUNCATED)) { + && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) { throw new RecoveryInProgressException("Recovery in progress, file [" + src + "], " + "lease owner [" + lease.getHolder() + "]"); } else { @@ -3942,8 +4017,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, throw new AlreadyBeingCreatedException(message); case UNDER_CONSTRUCTION: case UNDER_RECOVERY: - case BEING_TRUNCATED: final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock; + // determine if last block was intended to be truncated + Block recoveryBlock = uc.getTruncateBlock(); + boolean truncateRecovery = recoveryBlock != null; + boolean copyOnTruncate = truncateRecovery && + recoveryBlock.getBlockId() != uc.getBlockId(); + assert !copyOnTruncate || + recoveryBlock.getBlockId() < uc.getBlockId() && + recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() && + recoveryBlock.getNumBytes() > uc.getNumBytes() : + "wrong recoveryBlock"; + // setup the last block locations from the blockManager if not known if (uc.getNumExpectedLocations() == 0) { uc.setExpectedLocations(blockManager.getStorages(lastBlock)); @@ -3964,9 +4049,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // start recovery of the last block for this file long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc)); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); - if (uc.getBlockUCState() != BlockUCState.BEING_TRUNCATED) { - uc.initializeBlockRecovery(blockRecoveryId); + if(copyOnTruncate) { + uc.setGenerationStamp(blockRecoveryId); + } else if(truncateRecovery) { + recoveryBlock.setGenerationStamp(blockRecoveryId); } + uc.initializeBlockRecovery(blockRecoveryId); leaseManager.renewLease(lease); // Cannot close file right now, since the last block requires recovery. // This may potentially cause infinite loop in lease recovery @@ -4076,11 +4164,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return true; } - void commitBlockSynchronization(ExtendedBlock lastblock, + void commitBlockSynchronization(ExtendedBlock oldBlock, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages) throws IOException { - LOG.info("commitBlockSynchronization(lastblock=" + lastblock + LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock + ", newgenerationstamp=" + newgenerationstamp + ", newlength=" + newlength + ", newtargets=" + Arrays.asList(newtargets) @@ -4099,17 +4187,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, checkNameNodeSafeMode( "Cannot commitBlockSynchronization while in safe mode"); final BlockInfo storedBlock = getStoredBlock( - ExtendedBlock.getLocalBlock(lastblock)); + ExtendedBlock.getLocalBlock(oldBlock)); if (storedBlock == null) { if (deleteblock) { // This may be a retry attempt so ignore the failure // to locate the block. if (LOG.isDebugEnabled()) { - LOG.debug("Block (=" + lastblock + ") not found"); + LOG.debug("Block (=" + oldBlock + ") not found"); } return; } else { - throw new IOException("Block (=" + lastblock + ") not found"); + throw new IOException("Block (=" + oldBlock + ") not found"); } } // @@ -4136,34 +4224,40 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, + iFile.getFullPathName() + ", likely due to delayed block" + " removal"); } - if (!iFile.isUnderConstruction() || storedBlock.isComplete()) { + if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) && + iFile.getLastBlock().isComplete()) { if (LOG.isDebugEnabled()) { - LOG.debug("Unexpected block (=" + lastblock + LOG.debug("Unexpected block (=" + oldBlock + ") since the file (=" + iFile.getLocalName() + ") is not under construction"); } return; } - long recoveryId = - ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId(); + BlockInfoUnderConstruction truncatedBlock = + (BlockInfoUnderConstruction) iFile.getLastBlock(); + long recoveryId = truncatedBlock.getBlockRecoveryId(); + boolean copyTruncate = + truncatedBlock.getBlockId() != storedBlock.getBlockId(); if(recoveryId != newgenerationstamp) { throw new IOException("The recovery id " + newgenerationstamp + " does not match current recovery id " - + recoveryId + " for block " + lastblock); + + recoveryId + " for block " + oldBlock); } if (deleteblock) { - Block blockToDel = ExtendedBlock.getLocalBlock(lastblock); + Block blockToDel = ExtendedBlock.getLocalBlock(oldBlock); boolean remove = iFile.removeLastBlock(blockToDel); if (remove) { - blockManager.removeBlockFromMap(storedBlock); + blockManager.removeBlock(storedBlock); } } else { // update last block - storedBlock.setGenerationStamp(newgenerationstamp); - storedBlock.setNumBytes(newlength); + if(!copyTruncate) { + storedBlock.setGenerationStamp(newgenerationstamp); + storedBlock.setNumBytes(newlength); + } // find the DatanodeDescriptor objects // There should be no locations in the blockManager till now because the @@ -4193,7 +4287,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, DatanodeStorageInfo storageInfo = trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); if (storageInfo != null) { - storageInfo.addBlock(storedBlock); + if(copyTruncate) { + storageInfo.addBlock(truncatedBlock); + } else { + storageInfo.addBlock(storedBlock); + } } } } @@ -4203,11 +4301,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockManager.getDatanodeManager().getDatanodeStorageInfos( trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]), trimmedStorages.toArray(new String[trimmedStorages.size()])); - iFile.setLastBlock(storedBlock, trimmedStorageInfos); + if(copyTruncate) { + iFile.setLastBlock(truncatedBlock, trimmedStorageInfos); + } else { + iFile.setLastBlock(storedBlock, trimmedStorageInfos); + } } if (closeFile) { - src = closeFileCommitBlocks(iFile, storedBlock); + if(copyTruncate) { + src = closeFileCommitBlocks(iFile, truncatedBlock); + if(!iFile.isBlockInLatestSnapshot(storedBlock)) { + blockManager.removeBlock(storedBlock); + } + } else { + src = closeFileCommitBlocks(iFile, storedBlock); + } } else { // If this commit does not want to close the file, persist blocks src = iFile.getFullPathName(); @@ -4218,13 +4327,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } getEditLog().logSync(); if (closeFile) { - LOG.info("commitBlockSynchronization(newblock=" + lastblock + LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock + ", file=" + src + ", newgenerationstamp=" + newgenerationstamp + ", newlength=" + newlength + ", newtargets=" + Arrays.asList(newtargets) + ") successful"); } else { - LOG.info("commitBlockSynchronization(" + lastblock + ") successful"); + LOG.info("commitBlockSynchronization(" + oldBlock + ") successful"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index 41b23913c3d..58ef5361bf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -228,7 +228,8 @@ public abstract class INode implements INodeAttributes, Diff.Element { /** Is this inode in the latest snapshot? */ public final boolean isInLatestSnapshot(final int latestSnapshotId) { - if (latestSnapshotId == Snapshot.CURRENT_STATE_ID) { + if (latestSnapshotId == Snapshot.CURRENT_STATE_ID || + latestSnapshotId == Snapshot.NO_SNAPSHOT_ID) { return false; } // if parent is a reference node, parent must be a renamed node. We can @@ -817,11 +818,15 @@ public abstract class INode implements INodeAttributes, Diff.Element { * @param toDelete the to-be-deleted block */ public void addDeleteBlock(Block toDelete) { - if (toDelete != null) { - toDeleteList.add(toDelete); - } + assert toDelete != null : "toDelete is null"; + toDeleteList.add(toDelete); } - + + public void removeDeleteBlock(Block block) { + assert block != null : "block is null"; + toDeleteList.remove(block); + } + /** * Clear {@link BlocksMapUpdateInfo#toDeleteList} */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index d1ff2f78979..64887e6f3f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -24,7 +24,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -304,6 +306,11 @@ public class INodeFile extends INodeWithAdditionalFields @Override public void recordModification(final int latestSnapshotId) throws QuotaExceededException { + recordModification(latestSnapshotId, false); + } + + public void recordModification(final int latestSnapshotId, boolean withBlocks) + throws QuotaExceededException { if (isInLatestSnapshot(latestSnapshotId) && !shouldRecordInSrcSnapshot(latestSnapshotId)) { // the file is in snapshot, create a snapshot feature if it does not have @@ -312,10 +319,10 @@ public class INodeFile extends INodeWithAdditionalFields sf = addSnapshotFeature(null); } // record self in the diff list if necessary - sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null); + sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null, withBlocks); } } - + public FileDiffList getDiffs() { FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature(); if (sf != null) { @@ -415,6 +422,20 @@ public class INodeFile extends INodeWithAdditionalFields return this.blocks; } + /** @return blocks of the file corresponding to the snapshot. */ + public BlockInfo[] getBlocks(int snapshot) { + if(snapshot == CURRENT_STATE_ID || getDiffs() == null) + return getBlocks(); + FileDiff diff = getDiffs().getDiffById(snapshot); + BlockInfo[] snapshotBlocks = diff == null ? getBlocks() : diff.getBlocks(); + if(snapshotBlocks != null) + return snapshotBlocks; + // Blocks are not in the current snapshot + // Find next snapshot with blocks present or return current file blocks + snapshotBlocks = getDiffs().findLaterSnapshotBlocks(diff.getSnapshotId()); + return (snapshotBlocks == null) ? getBlocks() : snapshotBlocks; + } + void updateBlockCollection() { if (blocks != null) { for(BlockInfo b : blocks) { @@ -509,13 +530,13 @@ public class INodeFile extends INodeWithAdditionalFields } clear(); removedINodes.add(this); - FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); if (sf != null) { + sf.getDiffs().destroyAndCollectSnapshotBlocks(collectedBlocks); sf.clearDiffs(); } } - + @Override public String getName() { // Get the full path name of this inode. @@ -554,39 +575,23 @@ public class INodeFile extends INodeWithAdditionalFields @Override public final ContentSummaryComputationContext computeContentSummary( final ContentSummaryComputationContext summary) { - computeContentSummary4Snapshot(summary.getCounts()); - computeContentSummary4Current(summary.getCounts()); - return summary; - } - - private void computeContentSummary4Snapshot(final Content.Counts counts) { - // file length and diskspace only counted for the latest state of the file - // i.e. either the current state or the last snapshot + final Content.Counts counts = summary.getCounts(); FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); - if (sf != null) { + if (sf == null) { + counts.add(Content.LENGTH, computeFileSize()); + counts.add(Content.FILE, 1); + } else { final FileDiffList diffs = sf.getDiffs(); final int n = diffs.asList().size(); counts.add(Content.FILE, n); if (n > 0 && sf.isCurrentFileDeleted()) { counts.add(Content.LENGTH, diffs.getLast().getFileSize()); - } - - if (sf.isCurrentFileDeleted()) { - final long lastFileSize = diffs.getLast().getFileSize(); - counts.add(Content.DISKSPACE, lastFileSize * getBlockReplication()); + } else { + counts.add(Content.LENGTH, computeFileSize()); } } - } - - private void computeContentSummary4Current(final Content.Counts counts) { - FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature(); - if (sf != null && sf.isCurrentFileDeleted()) { - return; - } - - counts.add(Content.LENGTH, computeFileSize()); - counts.add(Content.FILE, 1); counts.add(Content.DISKSPACE, diskspaceConsumed()); + return summary; } /** The same as computeFileSize(null). */ @@ -651,9 +656,36 @@ public class INodeFile extends INodeWithAdditionalFields return size; } + /** + * Compute size consumed by all blocks of the current file, + * including blocks in its snapshots. + * Use preferred block size for the last block if it is under construction. + */ public final long diskspaceConsumed() { - // use preferred block size for the last block if it is under construction - return computeFileSize(true, true) * getBlockReplication(); + FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); + if(sf == null) { + return computeFileSize(true, true) * getBlockReplication(); + } + + // Collect all distinct blocks + long size = 0; + Set allBlocks = new HashSet(Arrays.asList(getBlocks())); + List diffs = sf.getDiffs().asList(); + for(FileDiff diff : diffs) { + BlockInfo[] diffBlocks = diff.getBlocks(); + if (diffBlocks != null) { + allBlocks.addAll(Arrays.asList(diffBlocks)); + } + } + for(Block block : allBlocks) { + size += block.getNumBytes(); + } + // check if the last block is under construction + BlockInfo lastBlock = getLastBlock(); + if(lastBlock != null && lastBlock instanceof BlockInfoUnderConstruction) { + size += getPreferredBlockSize() - lastBlock.getNumBytes(); + } + return size * getBlockReplication(); } public final long diskspaceConsumed(int lastSnapshotId) { @@ -706,7 +738,7 @@ public class INodeFile extends INodeWithAdditionalFields final BlockInfo[] oldBlocks = getBlocks(); if (oldBlocks == null) return 0; - //find the minimum n such that the size of the first n blocks > max + // find the minimum n such that the size of the first n blocks > max int n = 0; long size = 0; for(; n < oldBlocks.length && max > size; n++) { @@ -716,16 +748,8 @@ public class INodeFile extends INodeWithAdditionalFields return size; // starting from block n, the data is beyond max. - // resize the array. - final BlockInfo[] newBlocks; - if (n == 0) { - newBlocks = BlockInfo.EMPTY_ARRAY; - } else { - newBlocks = new BlockInfo[n]; - System.arraycopy(oldBlocks, 0, newBlocks, 0, n); - } - // set new blocks - setBlocks(newBlocks); + // resize the array. + truncateBlocksTo(n); // collect the blocks beyond max if (collectedBlocks != null) { @@ -735,4 +759,67 @@ public class INodeFile extends INodeWithAdditionalFields } return size; } + + void truncateBlocksTo(int n) { + final BlockInfo[] newBlocks; + if (n == 0) { + newBlocks = BlockInfo.EMPTY_ARRAY; + } else { + newBlocks = new BlockInfo[n]; + System.arraycopy(getBlocks(), 0, newBlocks, 0, n); + } + // set new blocks + setBlocks(newBlocks); + } + + public void collectBlocksBeyondSnapshot(BlockInfo[] snapshotBlocks, + BlocksMapUpdateInfo collectedBlocks) { + BlockInfo[] oldBlocks = getBlocks(); + if(snapshotBlocks == null || oldBlocks == null) + return; + // Skip blocks in common between the file and the snapshot + int n = 0; + while(n < oldBlocks.length && n < snapshotBlocks.length && + oldBlocks[n] == snapshotBlocks[n]) { + n++; + } + truncateBlocksTo(n); + // Collect the remaining blocks of the file + while(n < oldBlocks.length) { + collectedBlocks.addDeleteBlock(oldBlocks[n++]); + } + } + + /** Exclude blocks collected for deletion that belong to a snapshot. */ + void excludeSnapshotBlocks(int snapshotId, + BlocksMapUpdateInfo collectedBlocks) { + if(collectedBlocks == null || collectedBlocks.getToDeleteList().isEmpty()) + return; + FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); + if(sf == null) + return; + BlockInfo[] snapshotBlocks = + getDiffs().findEarlierSnapshotBlocks(snapshotId); + if(snapshotBlocks == null) + return; + List toDelete = collectedBlocks.getToDeleteList(); + for(Block blk : snapshotBlocks) { + if(toDelete.contains(blk)) + collectedBlocks.removeDeleteBlock(blk); + } + } + + /** + * @return true if the block is contained in a snapshot or false otherwise. + */ + boolean isBlockInLatestSnapshot(BlockInfo block) { + FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature(); + if (sf == null || sf.getDiffs() == null) + return false; + BlockInfo[] snapshotBlocks = + getDiffs().findEarlierSnapshotBlocks(getDiffs().getLastSnapshotId()); + if(snapshotBlocks == null) + return false; + return Arrays.asList(snapshotBlocks).contains(block); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java index 512913b3acc..d742c6d7d60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java @@ -69,8 +69,9 @@ public class NameNodeLayoutVersion { CREATE_OVERWRITE(-58, "Use single editlog record for " + "creating file with overwrite"), XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"), - BLOCK_STORAGE_POLICY(-60, "Block Storage policy"); - + BLOCK_STORAGE_POLICY(-60, "Block Storage policy"), + TRUNCATE(-61, "Truncate"); + private final FeatureInfo info; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java index d918495765d..b3302151b9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java @@ -163,9 +163,12 @@ abstract class AbstractINodeDiffList bpl = pbf.getBlocksList(); + BlockInfo[] blocks = new BlockInfo[bpl.size()]; + for(int j = 0, e = bpl.size(); j < e; ++j) { + Block blk = PBHelper.convert(bpl.get(j)); + BlockInfo storedBlock = fsn.getBlockManager().getStoredBlock(blk); + if(storedBlock == null) { + storedBlock = fsn.getBlockManager().addBlockCollection( + new BlockInfo(blk, copy.getFileReplication()), file); + } + blocks[j] = storedBlock; + } + if(blocks.length > 0) { + diff.setBlocks(blocks); + } diffs.addFirst(diff); } file.addSnapshotFeature(diffs); @@ -472,6 +490,11 @@ public class FSImageFormatPBSnapshot { SnapshotDiffSection.FileDiff.Builder fb = SnapshotDiffSection.FileDiff .newBuilder().setSnapshotId(diff.getSnapshotId()) .setFileSize(diff.getFileSize()); + if(diff.getBlocks() != null) { + for(Block block : diff.getBlocks()) { + fb.addBlocks(PBHelper.convert(block)); + } + } INodeFileAttributes copy = diff.snapshotINode; if (copy != null) { fb.setName(ByteString.copyFrom(copy.getLocalNameBytes())) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java index 919ab564c66..7b52dc9c5b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode.snapshot; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -37,10 +39,13 @@ public class FileDiff extends /** The file size at snapshot creation time. */ private final long fileSize; + /** A copy of the INodeFile block list. Used in truncate. */ + private BlockInfo[] blocks; FileDiff(int snapshotId, INodeFile file) { super(snapshotId, null, null); fileSize = file.computeFileSize(); + blocks = null; } /** Constructor used by FSImage loading */ @@ -48,20 +53,40 @@ public class FileDiff extends FileDiff posteriorDiff, long fileSize) { super(snapshotId, snapshotINode, posteriorDiff); this.fileSize = fileSize; + blocks = null; } /** @return the file size in the snapshot. */ public long getFileSize() { return fileSize; } - + + /** + * Copy block references into the snapshot + * up to the current {@link #fileSize}. + * Should be done only once. + */ + public void setBlocks(BlockInfo[] blocks) { + if(this.blocks != null) + return; + int numBlocks = 0; + for(long s = 0; numBlocks < blocks.length && s < fileSize; numBlocks++) + s += blocks[numBlocks].getNumBytes(); + this.blocks = Arrays.copyOf(blocks, numBlocks); + } + + public BlockInfo[] getBlocks() { + return blocks; + } + @Override Quota.Counts combinePosteriorAndCollectBlocks(INodeFile currentINode, FileDiff posterior, BlocksMapUpdateInfo collectedBlocks, final List removedINodes) { - return currentINode.getFileWithSnapshotFeature() - .updateQuotaAndCollectBlocks(currentINode, posterior, collectedBlocks, - removedINodes); + FileWithSnapshotFeature sf = currentINode.getFileWithSnapshotFeature(); + assert sf != null : "FileWithSnapshotFeature is null"; + return sf.updateQuotaAndCollectBlocks( + currentINode, posterior, collectedBlocks, removedINodes); } @Override @@ -91,4 +116,13 @@ public class FileDiff extends .updateQuotaAndCollectBlocks(currentINode, this, collectedBlocks, removedINodes); } + + public void destroyAndCollectSnapshotBlocks( + BlocksMapUpdateInfo collectedBlocks) { + if(blocks == null || collectedBlocks == null) + return; + for(BlockInfo blk : blocks) + collectedBlocks.addDeleteBlock(blk); + blocks = null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java index b0a973d28ca..07652f464c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hdfs.server.namenode.snapshot; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes; @@ -33,4 +40,95 @@ public class FileDiffList extends INodeFileAttributes createSnapshotCopy(INodeFile currentINode) { return new INodeFileAttributes.SnapshotCopy(currentINode); } + + public void destroyAndCollectSnapshotBlocks( + BlocksMapUpdateInfo collectedBlocks) { + for(FileDiff d : asList()) + d.destroyAndCollectSnapshotBlocks(collectedBlocks); + } + + public void saveSelf2Snapshot(int latestSnapshotId, INodeFile iNodeFile, + INodeFileAttributes snapshotCopy, boolean withBlocks) + throws QuotaExceededException { + final FileDiff diff = + super.saveSelf2Snapshot(latestSnapshotId, iNodeFile, snapshotCopy); + if(withBlocks) // Store blocks if this is the first update + diff.setBlocks(iNodeFile.getBlocks()); + } + + public BlockInfo[] findEarlierSnapshotBlocks(int snapshotId) { + assert snapshotId != Snapshot.NO_SNAPSHOT_ID : "Wrong snapshot id"; + if(snapshotId == Snapshot.CURRENT_STATE_ID) { + return null; + } + List diffs = this.asList(); + int i = Collections.binarySearch(diffs, snapshotId); + BlockInfo[] blocks = null; + for(i = i >= 0 ? i : -i; i < diffs.size(); i--) { + blocks = diffs.get(i).getBlocks(); + if(blocks != null) { + break; + } + } + return blocks; + } + + public BlockInfo[] findLaterSnapshotBlocks(int snapshotId) { + assert snapshotId != Snapshot.NO_SNAPSHOT_ID : "Wrong snapshot id"; + if(snapshotId == Snapshot.CURRENT_STATE_ID) { + return null; + } + List diffs = this.asList(); + int i = Collections.binarySearch(diffs, snapshotId); + BlockInfo[] blocks = null; + for(i = i >= 0 ? i+1 : -i-1; i < diffs.size(); i++) { + blocks = diffs.get(i).getBlocks(); + if(blocks != null) { + break; + } + } + return blocks; + } + + /** + * Copy blocks from the removed snapshot into the previous snapshot + * up to the file length of the latter. + * Collect unused blocks of the removed snapshot. + */ + void combineAndCollectSnapshotBlocks(INodeFile file, + FileDiff removed, + BlocksMapUpdateInfo collectedBlocks, + List removedINodes) { + BlockInfo[] removedBlocks = removed.getBlocks(); + if(removedBlocks == null) { + FileWithSnapshotFeature sf = file.getFileWithSnapshotFeature(); + assert sf != null : "FileWithSnapshotFeature is null"; + if(sf.isCurrentFileDeleted()) + sf.collectBlocksAndClear(file, collectedBlocks, removedINodes); + return; + } + int p = getPrior(removed.getSnapshotId(), true); + FileDiff earlierDiff = p == Snapshot.NO_SNAPSHOT_ID ? null : getDiffById(p); + // Copy blocks to the previous snapshot if not set already + if(earlierDiff != null) + earlierDiff.setBlocks(removedBlocks); + BlockInfo[] earlierBlocks = + (earlierDiff == null ? new BlockInfo[]{} : earlierDiff.getBlocks()); + // Find later snapshot (or file itself) with blocks + BlockInfo[] laterBlocks = findLaterSnapshotBlocks(removed.getSnapshotId()); + laterBlocks = (laterBlocks==null) ? file.getBlocks() : laterBlocks; + // Skip blocks, which belong to either the earlier or the later lists + int i = 0; + for(; i < removedBlocks.length; i++) { + if(i < earlierBlocks.length && removedBlocks[i] == earlierBlocks[i]) + continue; + if(i < laterBlocks.length && removedBlocks[i] == laterBlocks[i]) + continue; + break; + } + // Collect the remaining blocks of the file + while(i < removedBlocks.length) { + collectedBlocks.addDeleteBlock(removedBlocks[i++]); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java index 16f534f0a9b..e3482319ce5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.namenode.AclFeature; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -154,18 +155,19 @@ public class FileWithSnapshotFeature implements INode.Feature { AclStorage.removeAclFeature(aclFeature); } } - - collectBlocksAndClear(file, collectedBlocks, removedINodes); - + + getDiffs().combineAndCollectSnapshotBlocks( + file, removed, collectedBlocks, removedINodes); + long dsDelta = oldDiskspace - file.diskspaceConsumed(); return Quota.Counts.newInstance(0, dsDelta); } - + /** * If some blocks at the end of the block list no longer belongs to * any inode, collect them and update the block list. */ - private void collectBlocksAndClear(final INodeFile file, + public void collectBlocksAndClear(final INodeFile file, final BlocksMapUpdateInfo info, final List removedINodes) { // check if everything is deleted. if (isCurrentFileDeleted() && getDiffs().asList().isEmpty()) { @@ -174,13 +176,19 @@ public class FileWithSnapshotFeature implements INode.Feature { } // find max file size. final long max; + FileDiff diff = getDiffs().getLast(); if (isCurrentFileDeleted()) { - final FileDiff last = getDiffs().getLast(); - max = last == null? 0: last.getFileSize(); + max = diff == null? 0: diff.getFileSize(); } else { max = file.computeFileSize(); } - file.collectBlocksBeyondMax(max, info); + // Collect blocks that should be deleted + FileDiff last = diffs.getLast(); + BlockInfo[] snapshotBlocks = last == null ? null : last.getBlocks(); + if(snapshotBlocks == null) + file.collectBlocksBeyondMax(max, info); + else + file.collectBlocksBeyondSnapshot(snapshotBlocks, info); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java index c51203894c0..ced3296f486 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -53,8 +54,8 @@ public class BlockRecoveryCommand extends DatanodeCommand { @InterfaceAudience.Private @InterfaceStability.Evolving public static class RecoveringBlock extends LocatedBlock { - private boolean truncate; private final long newGenerationStamp; + private final Block recoveryBlock; /** * Create RecoveringBlock. @@ -62,15 +63,17 @@ public class BlockRecoveryCommand extends DatanodeCommand { public RecoveringBlock(ExtendedBlock b, DatanodeInfo[] locs, long newGS) { super(b, locs, -1, false); // startOffset is unknown this.newGenerationStamp = newGS; + this.recoveryBlock = null; } /** - * RecoveryingBlock with truncate option. + * Create RecoveringBlock with copy-on-truncate option. */ - public RecoveringBlock(ExtendedBlock b, DatanodeInfo[] locs, long newGS, - boolean truncate) { - this(b, locs, newGS); - this.truncate = truncate; + public RecoveringBlock(ExtendedBlock b, DatanodeInfo[] locs, + Block recoveryBlock) { + super(b, locs, -1, false); // startOffset is unknown + this.newGenerationStamp = recoveryBlock.getGenerationStamp(); + this.recoveryBlock = recoveryBlock; } /** @@ -82,10 +85,10 @@ public class BlockRecoveryCommand extends DatanodeCommand { } /** - * Return whether to truncate the block to the ExtendedBlock's length. + * Return the new block. */ - public boolean getTruncateFlag() { - return truncate; + public Block getNewBlock() { + return recoveryBlock; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java index 62915b4cbbd..72cb0c16123 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java @@ -67,5 +67,6 @@ public interface InterDatanodeProtocol { * Update replica with the new generation stamp and length. */ String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, - long newLength) throws IOException; + long newBlockId, long newLength) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto index 47f79bed169..1a217779882 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterDatanodeProtocol.proto @@ -59,6 +59,8 @@ message UpdateReplicaUnderRecoveryRequestProto { required ExtendedBlockProto block = 1; // Block identifier required uint64 recoveryId = 2; // New genstamp of the replica required uint64 newLength = 3; // New length of the replica + // New blockId for copy (truncate), default is 0. + optional uint64 newBlockId = 4 [default = 0]; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto index 588f6c86122..643a0345737 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto @@ -270,6 +270,7 @@ message SnapshotDiffSection { optional uint64 fileSize = 2; optional bytes name = 3; optional INodeSection.INodeFile snapshotCopy = 4; + repeated BlockProto blocks = 5; } message DiffEntry { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index d989c0a8eb3..97906b16402 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -554,9 +554,9 @@ enum ReplicaStateProto { * Block that needs to be recovered with at a given location */ message RecoveringBlockProto { - required uint64 newGenStamp = 1; // New genstamp post recovery - required LocatedBlockProto block = 2; // Block to be recovered - optional bool truncateFlag = 3; // Block needs to be truncated + required uint64 newGenStamp = 1; // New genstamp post recovery + required LocatedBlockProto block = 2; // Block to be recovered + optional BlockProto truncateBlock = 3; // New block for recovery (truncate) } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index e4834d658cb..7a4960e46c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -1239,7 +1239,7 @@ public class TestReplicationPolicy { when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any())) .thenReturn(ucBlock); - bm.convertLastBlockToUnderConstruction(mbc); + bm.convertLastBlockToUnderConstruction(mbc, 0L); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index e03b756c210..78eedf96c28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1106,6 +1106,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override // FsDatasetSpi public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, + long newBlockId, long newlength) { // Caller does not care about the exact Storage UUID returned. return datanodeUuid; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 987b4803cfa..9bf5e5246cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -219,10 +218,10 @@ public class TestBlockRecovery { syncList.add(record1); syncList.add(record2); - when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), - anyLong())).thenReturn("storage1"); - when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), - anyLong())).thenReturn("storage2"); + when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), + anyLong(), anyLong())).thenReturn("storage1"); + when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), + anyLong(), anyLong())).thenReturn("storage2"); dn.syncBlock(rBlock, syncList); } @@ -245,8 +244,10 @@ public class TestBlockRecovery { InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); - verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1); - verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, + REPLICA_LEN1); + verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, + REPLICA_LEN1); // two finalized replicas have different length replica1 = new ReplicaRecoveryInfo(BLOCK_ID, @@ -284,8 +285,10 @@ public class TestBlockRecovery { InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); - verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1); - verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, + REPLICA_LEN1); + verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, + REPLICA_LEN1); // rbw replica has a different length from the finalized one replica1 = new ReplicaRecoveryInfo(BLOCK_ID, @@ -297,9 +300,10 @@ public class TestBlockRecovery { dn2 = mock(InterDatanodeProtocol.class); testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); - verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, + REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( - block, RECOVERY_ID, REPLICA_LEN1); + block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } /** @@ -323,9 +327,10 @@ public class TestBlockRecovery { InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); - verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, + REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( - block, RECOVERY_ID, REPLICA_LEN1); + block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); // rbw replica has a different length from the finalized one replica1 = new ReplicaRecoveryInfo(BLOCK_ID, @@ -337,9 +342,10 @@ public class TestBlockRecovery { dn2 = mock(InterDatanodeProtocol.class); testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); - verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, + REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( - block, RECOVERY_ID, REPLICA_LEN1); + block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } /** @@ -362,8 +368,8 @@ public class TestBlockRecovery { long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); testSyncReplicas(replica1, replica2, dn1, dn2, minLen); - verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen); - verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); + verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); } /** @@ -385,9 +391,9 @@ public class TestBlockRecovery { InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); - verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); verify(dn2, never()).updateReplicaUnderRecovery( - block, RECOVERY_ID, REPLICA_LEN1); + block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); } /** @@ -411,8 +417,8 @@ public class TestBlockRecovery { long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); testSyncReplicas(replica1, replica2, dn1, dn2, minLen); - verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen); - verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen); + verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); + verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); } private Collection initRecoveringBlocks() throws IOException { @@ -513,7 +519,7 @@ public class TestBlockRecovery { } DataNode spyDN = spy(dn); doThrow(new IOException()).when(spyDN).updateReplicaUnderRecovery( - block, RECOVERY_ID, block.getNumBytes()); + block, RECOVERY_ID, BLOCK_ID, block.getNumBytes()); try { spyDN.syncBlock(rBlock, initBlockRecords(spyDN)); fail("Sync should fail"); @@ -634,7 +640,8 @@ public class TestBlockRecovery { recoveryInitResult.get()); dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock() - .getGenerationStamp() + 1, block.getBlockSize()); + .getGenerationStamp() + 1, block.getBlock().getBlockId(), + block.getBlockSize()); } finally { if (null != cluster) { cluster.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index 65a51761c9a..360968413e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -198,7 +198,8 @@ public class TestInterDatanodeProtocol { //verify updateBlock ExtendedBlock newblock = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes()/2, b.getGenerationStamp()+1); - idp.updateReplicaUnderRecovery(b, recoveryId, newblock.getNumBytes()); + idp.updateReplicaUnderRecovery(b, recoveryId, b.getBlockId(), + newblock.getNumBytes()); checkMetaInfo(newblock, datanode); // Verify correct null response trying to init recovery for a missing block @@ -368,7 +369,8 @@ public class TestInterDatanodeProtocol { .getBlockId(), rri.getNumBytes() - 1, rri.getGenerationStamp()); try { //update should fail - fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, newlength); + fsdataset.updateReplicaUnderRecovery(tmp, recoveryid, + tmp.getBlockId(), newlength); Assert.fail(); } catch(IOException ioe) { System.out.println("GOOD: getting " + ioe); @@ -377,7 +379,8 @@ public class TestInterDatanodeProtocol { //update final String storageID = fsdataset.updateReplicaUnderRecovery( - new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength); + new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, + rri.getBlockId(), newlength); assertTrue(storageID != null); } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java index d0502b36603..eae65ccfb0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java @@ -71,6 +71,7 @@ public class TestCommitBlockSynchronization { doReturn(true).when(file).isUnderConstruction(); doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class)); + doReturn(blockInfo).when(file).getLastBlock(); doReturn("").when(namesystemSpy).closeFileCommitBlocks( any(INodeFile.class), any(BlockInfo.class)); doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog(); @@ -105,6 +106,7 @@ public class TestCommitBlockSynchronization { completedBlockInfo.setGenerationStamp(genStamp); doReturn(completedBlockInfo).when(namesystemSpy) .getStoredBlock(any(Block.class)); + doReturn(completedBlockInfo).when(file).getLastBlock(); // Repeat the call to make sure it does not throw namesystemSpy.commitBlockSynchronization( @@ -176,6 +178,7 @@ public class TestCommitBlockSynchronization { completedBlockInfo.setGenerationStamp(genStamp); doReturn(completedBlockInfo).when(namesystemSpy) .getStoredBlock(any(Block.class)); + doReturn(completedBlockInfo).when(file).getLastBlock(); namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, true, false, newTargets, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java index ba9d04e914e..1f854d1c7ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java @@ -18,14 +18,22 @@ package org.apache.hadoop.hdfs.server.namenode; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.net.InetAddress; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,14 +47,14 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; -import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -57,6 +65,7 @@ public class TestFileTruncate { GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL); GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL); } + static final Log LOG = LogFactory.getLog(TestFileTruncate.class); static final int BLOCK_SIZE = 4; static final short REPLICATION = 3; static final int DATANODE_NUM = 3; @@ -129,6 +138,287 @@ public class TestFileTruncate { fs.delete(parent, true); } + @Test + public void testSnapshotWithAppendTruncate() throws IOException { + testSnapshotWithAppendTruncate(0, 1, 2); + testSnapshotWithAppendTruncate(0, 2, 1); + testSnapshotWithAppendTruncate(1, 0, 2); + testSnapshotWithAppendTruncate(1, 2, 0); + testSnapshotWithAppendTruncate(2, 0, 1); + testSnapshotWithAppendTruncate(2, 1, 0); + } + + /** + * Create three snapshots with appended and truncated file. + * Delete snapshots in the specified order and verify that + * remaining snapshots are still readable. + */ + void testSnapshotWithAppendTruncate(int ... deleteOrder) throws IOException { + FSDirectory fsDir = cluster.getNamesystem().getFSDirectory(); + Path parent = new Path("/test"); + fs.mkdirs(parent); + fs.setQuota(parent, 100, 1000); + fs.allowSnapshot(parent); + String truncateFile = "testSnapshotWithAppendTruncate"; + final Path src = new Path(parent, truncateFile); + int[] length = new int[4]; + length[0] = 2 * BLOCK_SIZE + BLOCK_SIZE / 2; + DFSTestUtil.createFile(fs, src, 64, length[0], BLOCK_SIZE, REPLICATION, 0L); + Block firstBlk = getLocatedBlocks(src).get(0).getBlock().getLocalBlock(); + Path[] snapshotFiles = new Path[4]; + + // Diskspace consumed should be 10 bytes * 3. [blk 1,2,3] + ContentSummary contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(30L)); + + // Add file to snapshot and append + String[] ss = new String[] {"ss0", "ss1", "ss2", "ss3"}; + Path snapshotDir = fs.createSnapshot(parent, ss[0]); + snapshotFiles[0] = new Path(snapshotDir, truncateFile); + length[1] = length[2] = length[0] + BLOCK_SIZE + 1; + DFSTestUtil.appendFile(fs, src, BLOCK_SIZE + 1); + Block lastBlk = getLocatedBlocks(src).getLastLocatedBlock() + .getBlock().getLocalBlock(); + + // Diskspace consumed should be 15 bytes * 3. [blk 1,2,3,4] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(45L)); + + // Create another snapshot without changes + snapshotDir = fs.createSnapshot(parent, ss[1]); + snapshotFiles[1] = new Path(snapshotDir, truncateFile); + + // Create another snapshot and append + snapshotDir = fs.createSnapshot(parent, ss[2]); + snapshotFiles[2] = new Path(snapshotDir, truncateFile); + DFSTestUtil.appendFile(fs, src, BLOCK_SIZE -1 + BLOCK_SIZE / 2); + Block appendedBlk = getLocatedBlocks(src).getLastLocatedBlock() + .getBlock().getLocalBlock(); + + // Diskspace consumed should be 20 bytes * 3. [blk 1,2,3,4,5] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(60L)); + + // Truncate to block boundary + int newLength = length[0] + BLOCK_SIZE / 2; + boolean isReady = fs.truncate(src, newLength); + assertTrue("Recovery is not expected.", isReady); + assertFileLength(snapshotFiles[2], length[2]); + assertFileLength(snapshotFiles[1], length[1]); + assertFileLength(snapshotFiles[0], length[0]); + assertBlockNotPresent(appendedBlk); + + // Diskspace consumed should be 16 bytes * 3. [blk 1,2,3 SS:4] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(48L)); + + // Truncate full block again + newLength = length[0] - BLOCK_SIZE / 2; + isReady = fs.truncate(src, newLength); + assertTrue("Recovery is not expected.", isReady); + assertFileLength(snapshotFiles[2], length[2]); + assertFileLength(snapshotFiles[1], length[1]); + assertFileLength(snapshotFiles[0], length[0]); + + // Diskspace consumed should be 16 bytes * 3. [blk 1,2 SS:3,4] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(48L)); + + // Truncate half of the last block + newLength -= BLOCK_SIZE / 2; + isReady = fs.truncate(src, newLength); + assertFalse("Recovery is expected.", isReady); + checkBlockRecovery(src); + assertFileLength(snapshotFiles[2], length[2]); + assertFileLength(snapshotFiles[1], length[1]); + assertFileLength(snapshotFiles[0], length[0]); + Block replacedBlk = getLocatedBlocks(src).getLastLocatedBlock() + .getBlock().getLocalBlock(); + + // Diskspace consumed should be 16 bytes * 3. [blk 1,6 SS:2,3,4] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(54L)); + + snapshotDir = fs.createSnapshot(parent, ss[3]); + snapshotFiles[3] = new Path(snapshotDir, truncateFile); + length[3] = newLength; + + // Delete file. Should still be able to read snapshots + int numINodes = fsDir.getInodeMapSize(); + isReady = fs.delete(src, false); + assertTrue("Delete failed.", isReady); + assertFileLength(snapshotFiles[3], length[3]); + assertFileLength(snapshotFiles[2], length[2]); + assertFileLength(snapshotFiles[1], length[1]); + assertFileLength(snapshotFiles[0], length[0]); + assertEquals("Number of INodes should not change", + numINodes, fsDir.getInodeMapSize()); + + fs.deleteSnapshot(parent, ss[3]); + + assertBlockExists(firstBlk); + assertBlockExists(lastBlk); + assertBlockNotPresent(replacedBlk); + + // Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(48L)); + + // delete snapshots in the specified order + fs.deleteSnapshot(parent, ss[deleteOrder[0]]); + assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]); + assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]); + assertBlockExists(firstBlk); + assertBlockExists(lastBlk); + assertEquals("Number of INodes should not change", + numINodes, fsDir.getInodeMapSize()); + + // Diskspace consumed should be 16 bytes * 3. [SS:1,2,3,4] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(48L)); + + fs.deleteSnapshot(parent, ss[deleteOrder[1]]); + assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]); + assertBlockExists(firstBlk); + contentSummary = fs.getContentSummary(parent); + if(fs.exists(snapshotFiles[0])) { + // Diskspace consumed should be 0 bytes * 3. [SS:1,2,3] + assertBlockNotPresent(lastBlk); + assertThat(contentSummary.getSpaceConsumed(), is(36L)); + } else { + // Diskspace consumed should be 48 bytes * 3. [SS:1,2,3,4] + assertThat(contentSummary.getSpaceConsumed(), is(48L)); + } + assertEquals("Number of INodes should not change", + numINodes, fsDir .getInodeMapSize()); + + fs.deleteSnapshot(parent, ss[deleteOrder[2]]); + assertBlockNotPresent(firstBlk); + assertBlockNotPresent(lastBlk); + + // Diskspace consumed should be 0 bytes * 3. [] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(0L)); + assertNotEquals("Number of INodes should change", + numINodes, fsDir.getInodeMapSize()); + } + + /** + * Create three snapshots with file truncated 3 times. + * Delete snapshots in the specified order and verify that + * remaining snapshots are still readable. + */ + @Test + public void testSnapshotWithTruncates() throws IOException { + testSnapshotWithTruncates(0, 1, 2); + testSnapshotWithTruncates(0, 2, 1); + testSnapshotWithTruncates(1, 0, 2); + testSnapshotWithTruncates(1, 2, 0); + testSnapshotWithTruncates(2, 0, 1); + testSnapshotWithTruncates(2, 1, 0); + } + + void testSnapshotWithTruncates(int ... deleteOrder) throws IOException { + Path parent = new Path("/test"); + fs.mkdirs(parent); + fs.setQuota(parent, 100, 1000); + fs.allowSnapshot(parent); + String truncateFile = "testSnapshotWithTruncates"; + final Path src = new Path(parent, truncateFile); + int[] length = new int[3]; + length[0] = 3 * BLOCK_SIZE; + DFSTestUtil.createFile(fs, src, 64, length[0], BLOCK_SIZE, REPLICATION, 0L); + Block firstBlk = getLocatedBlocks(src).get(0).getBlock().getLocalBlock(); + Block lastBlk = getLocatedBlocks(src).getLastLocatedBlock() + .getBlock().getLocalBlock(); + Path[] snapshotFiles = new Path[3]; + + // Diskspace consumed should be 12 bytes * 3. [blk 1,2,3] + ContentSummary contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(36L)); + + // Add file to snapshot and append + String[] ss = new String[] {"ss0", "ss1", "ss2"}; + Path snapshotDir = fs.createSnapshot(parent, ss[0]); + snapshotFiles[0] = new Path(snapshotDir, truncateFile); + length[1] = 2 * BLOCK_SIZE; + boolean isReady = fs.truncate(src, 2 * BLOCK_SIZE); + assertTrue("Recovery is not expected.", isReady); + + // Diskspace consumed should be 12 bytes * 3. [blk 1,2 SS:3] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(36L)); + snapshotDir = fs.createSnapshot(parent, ss[1]); + snapshotFiles[1] = new Path(snapshotDir, truncateFile); + + // Create another snapshot with truncate + length[2] = BLOCK_SIZE + BLOCK_SIZE / 2; + isReady = fs.truncate(src, BLOCK_SIZE + BLOCK_SIZE / 2); + assertFalse("Recovery is expected.", isReady); + checkBlockRecovery(src); + snapshotDir = fs.createSnapshot(parent, ss[2]); + snapshotFiles[2] = new Path(snapshotDir, truncateFile); + assertFileLength(snapshotFiles[0], length[0]); + assertBlockExists(lastBlk); + + // Diskspace consumed should be 14 bytes * 3. [blk 1,4 SS:2,3] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(42L)); + + fs.deleteSnapshot(parent, ss[deleteOrder[0]]); + assertFileLength(snapshotFiles[deleteOrder[1]], length[deleteOrder[1]]); + assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]); + assertFileLength(src, length[2]); + assertBlockExists(firstBlk); + + contentSummary = fs.getContentSummary(parent); + if(fs.exists(snapshotFiles[0])) { + // Diskspace consumed should be 14 bytes * 3. [blk 1,4 SS:2,3] + assertThat(contentSummary.getSpaceConsumed(), is(42L)); + assertBlockExists(lastBlk); + } else { + // Diskspace consumed should be 10 bytes * 3. [blk 1,4 SS:2] + assertThat(contentSummary.getSpaceConsumed(), is(30L)); + assertBlockNotPresent(lastBlk); + } + + fs.deleteSnapshot(parent, ss[deleteOrder[1]]); + assertFileLength(snapshotFiles[deleteOrder[2]], length[deleteOrder[2]]); + assertFileLength(src, length[2]); + assertBlockExists(firstBlk); + + contentSummary = fs.getContentSummary(parent); + if(fs.exists(snapshotFiles[0])) { + // Diskspace consumed should be 14 bytes * 3. [blk 1,4 SS:2,3] + assertThat(contentSummary.getSpaceConsumed(), is(42L)); + assertBlockExists(lastBlk); + } else if(fs.exists(snapshotFiles[1])) { + // Diskspace consumed should be 10 bytes * 3. [blk 1,4 SS:2] + assertThat(contentSummary.getSpaceConsumed(), is(30L)); + assertBlockNotPresent(lastBlk); + } else { + // Diskspace consumed should be 6 bytes * 3. [blk 1,4 SS:] + assertThat(contentSummary.getSpaceConsumed(), is(18L)); + assertBlockNotPresent(lastBlk); + } + + fs.deleteSnapshot(parent, ss[deleteOrder[2]]); + assertFileLength(src, length[2]); + assertBlockExists(firstBlk); + + // Diskspace consumed should be 6 bytes * 3. [blk 1,4 SS:] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(18L)); + assertThat(contentSummary.getLength(), is(6L)); + + fs.delete(src, false); + assertBlockNotPresent(firstBlk); + + // Diskspace consumed should be 0 bytes * 3. [] + contentSummary = fs.getContentSummary(parent); + assertThat(contentSummary.getSpaceConsumed(), is(0L)); + } + /** * Failure / recovery test for truncate. * In this failure the DNs fail to recover the blocks and the NN triggers @@ -159,8 +449,6 @@ public class TestFileTruncate { boolean isReady = fs.truncate(p, newLength); assertThat("truncate should have triggered block recovery.", isReady, is(false)); - FileStatus fileStatus = fs.getFileStatus(p); - assertThat(fileStatus.getLen(), is((long) newLength)); boolean recoveryTriggered = false; for(int i = 0; i < RECOVERY_ATTEMPTS; i++) { @@ -168,8 +456,6 @@ public class TestFileTruncate { NameNodeAdapter.getLeaseHolderForPath(cluster.getNameNode(), p.toUri().getPath()); if(leaseHolder.equals(HdfsServerConstants.NAMENODE_LEASE_HOLDER)) { - cluster.startDataNodes(conf, DATANODE_NUM, true, - HdfsServerConstants.StartupOption.REGULAR, null); recoveryTriggered = true; break; } @@ -177,6 +463,9 @@ public class TestFileTruncate { } assertThat("lease recovery should have occurred in ~" + SLEEP * RECOVERY_ATTEMPTS + " ms.", recoveryTriggered, is(true)); + cluster.startDataNodes(conf, DATANODE_NUM, true, + StartupOption.REGULAR, null); + cluster.waitActive(); checkBlockRecovery(p); @@ -184,10 +473,10 @@ public class TestFileTruncate { .setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD, HdfsConstants.LEASE_HARDLIMIT_PERIOD); - fileStatus = fs.getFileStatus(p); + FileStatus fileStatus = fs.getFileStatus(p); assertThat(fileStatus.getLen(), is((long) newLength)); - AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString()); + checkFullFile(p, newLength, contents); fs.delete(p, false); } @@ -198,10 +487,9 @@ public class TestFileTruncate { public void testTruncateEditLogLoad() throws IOException { int startingFileSize = 2 * BLOCK_SIZE + BLOCK_SIZE / 2; int toTruncate = 1; - + final String s = "/testTruncateEditLogLoad"; + final Path p = new Path(s); byte[] contents = AppendTestUtil.initBuffer(startingFileSize); - - final Path p = new Path("/testTruncateEditLogLoad"); writeContents(contents, startingFileSize, p); int newLength = startingFileSize - toTruncate; @@ -209,54 +497,183 @@ public class TestFileTruncate { assertThat("truncate should have triggered block recovery.", isReady, is(false)); - checkBlockRecovery(p); - cluster.restartNameNode(); + String holder = UserGroupInformation.getCurrentUser().getUserName(); + cluster.getNamesystem().recoverLease(s, holder, ""); + + checkBlockRecovery(p); + FileStatus fileStatus = fs.getFileStatus(p); assertThat(fileStatus.getLen(), is((long) newLength)); - AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString()); + checkFullFile(p, newLength, contents); fs.delete(p, false); } + /** + * Upgrade, RollBack, and restart test for Truncate. + */ + @Test + public void testUpgradeAndRestart() throws IOException { + Path parent = new Path("/test"); + fs.mkdirs(parent); + fs.setQuota(parent, 100, 1000); + fs.allowSnapshot(parent); + String truncateFile = "testUpgrade"; + final Path p = new Path(parent, truncateFile); + int startingFileSize = 2 * BLOCK_SIZE; + int toTruncate = 1; + byte[] contents = AppendTestUtil.initBuffer(startingFileSize); + writeContents(contents, startingFileSize, p); + + Path snapshotDir = fs.createSnapshot(parent, "ss0"); + Path snapshotFile = new Path(snapshotDir, truncateFile); + + int newLengthBeforeUpgrade = startingFileSize - toTruncate; + boolean isReady = fs.truncate(p, newLengthBeforeUpgrade); + assertThat("truncate should have triggered block recovery.", + isReady, is(false)); + + checkBlockRecovery(p); + + checkFullFile(p, newLengthBeforeUpgrade, contents); + assertFileLength(snapshotFile, startingFileSize); + long totalBlockBefore = cluster.getNamesystem().getBlocksTotal(); + + restartCluster(StartupOption.UPGRADE); + + assertThat("SafeMode should be OFF", + cluster.getNamesystem().isInSafeMode(), is(false)); + assertThat("NameNode should be performing upgrade.", + cluster.getNamesystem().isUpgradeFinalized(), is(false)); + FileStatus fileStatus = fs.getFileStatus(p); + assertThat(fileStatus.getLen(), is((long) newLengthBeforeUpgrade)); + + int newLengthAfterUpgrade = newLengthBeforeUpgrade - toTruncate; + Block oldBlk = getLocatedBlocks(p).getLastLocatedBlock() + .getBlock().getLocalBlock(); + isReady = fs.truncate(p, newLengthAfterUpgrade); + assertThat("truncate should have triggered block recovery.", + isReady, is(false)); + fileStatus = fs.getFileStatus(p); + assertThat(fileStatus.getLen(), is((long) newLengthAfterUpgrade)); + assertThat("Should copy on truncate during upgrade", + getLocatedBlocks(p).getLastLocatedBlock().getBlock() + .getLocalBlock().getBlockId(), is(not(equalTo(oldBlk.getBlockId())))); + + checkBlockRecovery(p); + + checkFullFile(p, newLengthAfterUpgrade, contents); + assertThat("Total block count should be unchanged from copy-on-truncate", + cluster.getNamesystem().getBlocksTotal(), is(totalBlockBefore)); + + restartCluster(StartupOption.ROLLBACK); + + assertThat("File does not exist " + p, fs.exists(p), is(true)); + fileStatus = fs.getFileStatus(p); + assertThat(fileStatus.getLen(), is((long) newLengthBeforeUpgrade)); + checkFullFile(p, newLengthBeforeUpgrade, contents); + assertThat("Total block count should be unchanged from rolling back", + cluster.getNamesystem().getBlocksTotal(), is(totalBlockBefore)); + + restartCluster(StartupOption.REGULAR); + assertThat("Total block count should be unchanged from start-up", + cluster.getNamesystem().getBlocksTotal(), is(totalBlockBefore)); + checkFullFile(p, newLengthBeforeUpgrade, contents); + assertFileLength(snapshotFile, startingFileSize); + + // empty edits and restart + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + cluster.restartNameNode(true); + assertThat("Total block count should be unchanged from start-up", + cluster.getNamesystem().getBlocksTotal(), is(totalBlockBefore)); + checkFullFile(p, newLengthBeforeUpgrade, contents); + assertFileLength(snapshotFile, startingFileSize); + + fs.deleteSnapshot(parent, "ss0"); + fs.delete(parent, true); + assertThat("File " + p + " shouldn't exist", fs.exists(p), is(false)); + } + /** * Check truncate recovery. */ @Test - public void testTruncateLastBlock() throws IOException { + public void testTruncateRecovery() throws IOException { FSNamesystem fsn = cluster.getNamesystem(); - - String src = "/file"; + String client = "client"; + String clientMachine = "clientMachine"; + Path parent = new Path("/test"); + String src = "/test/testTruncateRecovery"; Path srcPath = new Path(src); byte[] contents = AppendTestUtil.initBuffer(BLOCK_SIZE); writeContents(contents, BLOCK_SIZE, srcPath); - INodeFile inode = fsn.getFSDirectory().getINode(src).asFile(); - long oldGenstamp = GenerationStamp.LAST_RESERVED_STAMP; - DatanodeDescriptor dn = DFSTestUtil.getLocalDatanodeDescriptor(); - DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo( - dn.getDatanodeUuid(), InetAddress.getLocalHost().getHostAddress()); - dn.isAlive = true; - - BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction( - new Block(0, 1, oldGenstamp), (short) 1, - HdfsServerConstants.BlockUCState.BEING_TRUNCATED, - new DatanodeStorageInfo[] {storage}); - - inode.setBlocks(new BlockInfo[] {blockInfo}); + INodesInPath iip = fsn.getFSDirectory().getINodesInPath4Write(src, true); + INodeFile file = iip.getLastINode().asFile(); + long initialGenStamp = file.getLastBlock().getGenerationStamp(); + // Test that prepareFileForTruncate sets up in-place truncate. fsn.writeLock(); try { - fsn.initializeBlockRecovery(inode); - assertThat(inode.getLastBlock().getBlockUCState(), - is(HdfsServerConstants.BlockUCState.BEING_TRUNCATED)); - long blockRecoveryId = ((BlockInfoUnderConstruction) inode.getLastBlock()) + Block oldBlock = file.getLastBlock(); + Block truncateBlock = + fsn.prepareFileForTruncate(iip, client, clientMachine, 1, null); + // In-place truncate uses old block id with new genStamp. + assertThat(truncateBlock.getBlockId(), + is(equalTo(oldBlock.getBlockId()))); + assertThat(truncateBlock.getNumBytes(), + is(oldBlock.getNumBytes())); + assertThat(truncateBlock.getGenerationStamp(), + is(fsn.getBlockIdManager().getGenerationStampV2())); + assertThat(file.getLastBlock().getBlockUCState(), + is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY)); + long blockRecoveryId = ((BlockInfoUnderConstruction) file.getLastBlock()) .getBlockRecoveryId(); - assertThat(blockRecoveryId, is(oldGenstamp + 2)); + assertThat(blockRecoveryId, is(initialGenStamp + 1)); + fsn.getEditLog().logTruncate( + src, client, clientMachine, BLOCK_SIZE-1, Time.now(), truncateBlock); } finally { fsn.writeUnlock(); } + + // Re-create file and ensure we are ready to copy on truncate + writeContents(contents, BLOCK_SIZE, srcPath); + fs.allowSnapshot(parent); + fs.createSnapshot(parent, "ss0"); + iip = fsn.getFSDirectory().getINodesInPath(src, true); + file = iip.getLastINode().asFile(); + file.recordModification(iip.getLatestSnapshotId(), true); + assertThat(file.isBlockInLatestSnapshot(file.getLastBlock()), is(true)); + initialGenStamp = file.getLastBlock().getGenerationStamp(); + // Test that prepareFileForTruncate sets up copy-on-write truncate + fsn.writeLock(); + try { + Block oldBlock = file.getLastBlock(); + Block truncateBlock = + fsn.prepareFileForTruncate(iip, client, clientMachine, 1, null); + // Copy-on-write truncate makes new block with new id and genStamp + assertThat(truncateBlock.getBlockId(), + is(not(equalTo(oldBlock.getBlockId())))); + assertThat(truncateBlock.getNumBytes() < oldBlock.getNumBytes(), + is(true)); + assertThat(truncateBlock.getGenerationStamp(), + is(fsn.getBlockIdManager().getGenerationStampV2())); + assertThat(file.getLastBlock().getBlockUCState(), + is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY)); + long blockRecoveryId = ((BlockInfoUnderConstruction) file.getLastBlock()) + .getBlockRecoveryId(); + assertThat(blockRecoveryId, is(initialGenStamp + 1)); + fsn.getEditLog().logTruncate( + src, client, clientMachine, BLOCK_SIZE-1, Time.now(), truncateBlock); + } finally { + fsn.writeUnlock(); + } + checkBlockRecovery(srcPath); + fs.deleteSnapshot(parent, "ss0"); + fs.delete(parent, true); } static void writeContents(byte[] contents, int fileLength, Path p) @@ -286,4 +703,38 @@ public class TestFileTruncate { static LocatedBlocks getLocatedBlocks(Path src) throws IOException { return fs.getClient().getLocatedBlocks(src.toString(), 0, Long.MAX_VALUE); } + + static void assertBlockExists(Block blk) { + assertNotNull("BlocksMap does not contain block: " + blk, + cluster.getNamesystem().getStoredBlock(blk)); + } + + static void assertBlockNotPresent(Block blk) { + assertNull("BlocksMap should not contain block: " + blk, + cluster.getNamesystem().getStoredBlock(blk)); + } + + static void assertFileLength(Path file, long length) throws IOException { + byte[] data = DFSTestUtil.readFileBuffer(fs, file); + assertEquals("Wrong data size in snapshot.", length, data.length); + } + + static void checkFullFile(Path p, int newLength, byte[] contents) + throws IOException { + AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString()); + } + + static void restartCluster(StartupOption o) + throws IOException { + cluster.shutdown(); + if(StartupOption.ROLLBACK == o) + NameNode.doRollback(conf, false); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM) + .format(false) + .nameNodePort(NameNode.DEFAULT_PORT) + .startupOption(o==StartupOption.ROLLBACK ? StartupOption.REGULAR : o) + .dnStartupOption(o!=StartupOption.ROLLBACK ? StartupOption.REGULAR : o) + .build(); + fs = cluster.getFileSystem(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored index 002c506da4a..dce3f472c10 100644 Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml index f61c0757327..6e8078be81d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml @@ -1,6 +1,6 @@ - -60 + -61 OP_START_LOG_SEGMENT @@ -13,8 +13,8 @@ 2 1 - 1421822547136 - 24319c7d1f7c0828 + 1421826999207 + ca9a0c8b240570b3 @@ -24,8 +24,8 @@ 3 2 - 1421822547140 - 254b1207021431f4 + 1421826999210 + 833c25a6fb2b0a6f @@ -37,19 +37,19 @@ 16386 /file_create 1 - 1421131348286 - 1421131348286 + 1421135800328 + 1421135800328 512 - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 127.0.0.1 true - plamenjeliazkov + shv supergroup 420 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 6 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 9 @@ -60,14 +60,14 @@ 0 /file_create 1 - 1421131348328 - 1421131348286 + 1421135800357 + 1421135800328 512 false - plamenjeliazkov + shv supergroup 420 @@ -88,9 +88,9 @@ 0 /file_create /file_moved - 1421131348343 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 9 + 1421135800368 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 12 @@ -99,9 +99,9 @@ 8 0 /file_moved - 1421131348353 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 10 + 1421135800377 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 13 @@ -111,9 +111,9 @@ 0 16387 /directory_mkdir - 1421131348366 + 1421135800394 - plamenjeliazkov + shv supergroup 493 @@ -146,8 +146,8 @@ 13 /directory_mkdir snapshot1 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 15 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 18 @@ -157,8 +157,8 @@ /directory_mkdir snapshot1 snapshot2 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 16 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 19 @@ -167,8 +167,8 @@ 15 /directory_mkdir snapshot2 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 17 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 20 @@ -179,19 +179,19 @@ 16388 /file_create 1 - 1421131348401 - 1421131348401 + 1421135800442 + 1421135800442 512 - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 127.0.0.1 true - plamenjeliazkov + shv supergroup 420 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 18 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 21 @@ -202,14 +202,14 @@ 0 /file_create 1 - 1421131348405 - 1421131348401 + 1421135800445 + 1421135800442 512 false - plamenjeliazkov + shv supergroup 420 @@ -265,10 +265,10 @@ 0 /file_create /file_moved - 1421131348436 + 1421135800485 NONE - 99bcddc1-3460-4630-9904-6c7ca5811945 - 25 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 28 @@ -279,19 +279,19 @@ 16389 /file_concat_target 1 - 1421131348443 - 1421131348443 + 1421135800495 + 1421135800495 512 - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 127.0.0.1 true - plamenjeliazkov + shv supergroup 420 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 27 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 30 @@ -396,8 +396,8 @@ 0 /file_concat_target 1 - 1421131348998 - 1421131348443 + 1421135801050 + 1421135800495 512 @@ -418,7 +418,7 @@ 1003 - plamenjeliazkov + shv supergroup 420 @@ -432,19 +432,19 @@ 16390 /file_concat_0 1 - 1421131349001 - 1421131349001 + 1421135801053 + 1421135801053 512 - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 127.0.0.1 true - plamenjeliazkov + shv supergroup 420 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 38 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 41 @@ -549,8 +549,8 @@ 0 /file_concat_0 1 - 1421131349032 - 1421131349001 + 1421135801091 + 1421135801053 512 @@ -571,7 +571,7 @@ 1006 - plamenjeliazkov + shv supergroup 420 @@ -585,19 +585,19 @@ 16391 /file_concat_1 1 - 1421131349036 - 1421131349036 + 1421135801095 + 1421135801095 512 - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 127.0.0.1 true - plamenjeliazkov + shv supergroup 420 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 47 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 50 @@ -702,8 +702,8 @@ 0 /file_concat_1 1 - 1421131349060 - 1421131349036 + 1421135801126 + 1421135801095 512 @@ -724,7 +724,7 @@ 1009 - plamenjeliazkov + shv supergroup 420 @@ -736,13 +736,13 @@ 57 0 /file_concat_target - 1421131349064 + 1421135801130 /file_concat_0 /file_concat_1 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 55 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 58 @@ -753,19 +753,19 @@ 16392 /file_create 1 - 1421131349068 - 1421131349068 + 1421135810102 + 1421135810102 512 - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 127.0.0.1 true - plamenjeliazkov + shv supergroup 420 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 57 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 63 @@ -837,8 +837,8 @@ 0 /file_create 1 - 1421131349085 - 1421131349068 + 1421135810122 + 1421135810102 512 @@ -854,7 +854,7 @@ 1011 - plamenjeliazkov + shv supergroup 420 @@ -865,10 +865,10 @@ 66 /file_create - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 127.0.0.1 512 - 1421131349088 + 1421135810125 @@ -879,15 +879,15 @@ 16393 /file_symlink /file_concat_target - 1421131349095 - 1421131349095 + 1421135810132 + 1421135810132 - plamenjeliazkov + shv supergroup 511 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 64 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 70 @@ -898,19 +898,19 @@ 16394 /hard-lease-recovery-test 1 - 1421131349098 - 1421131349098 + 1421135810135 + 1421135810135 512 - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 127.0.0.1 true - plamenjeliazkov + shv supergroup 420 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 65 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 71 @@ -966,7 +966,7 @@ OP_REASSIGN_LEASE 74 - DFSClient_NONMAPREDUCE_526346936_1 + DFSClient_NONMAPREDUCE_240777107_1 /hard-lease-recovery-test HDFS_NameNode @@ -979,8 +979,8 @@ 0 /hard-lease-recovery-test 1 - 1421131351230 - 1421131349098 + 1421135812235 + 1421135810135 512 @@ -991,7 +991,7 @@ 1013 - plamenjeliazkov + shv supergroup 420 @@ -1002,13 +1002,13 @@ 76 pool1 - plamenjeliazkov - staff + shv + shv 493 9223372036854775807 2305843009213693951 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 72 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 78 @@ -1017,8 +1017,8 @@ 77 pool1 99 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 73 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 79 @@ -1029,9 +1029,9 @@ /path 1 pool1 - 2305844430345046085 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 74 + 2305844430349507141 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 80 @@ -1040,8 +1040,8 @@ 79 1 2 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 75 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 81 @@ -1049,8 +1049,8 @@ 80 1 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 76 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 82 @@ -1058,8 +1058,8 @@ 81 pool1 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 77 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 83 @@ -1105,8 +1105,8 @@ a1 0x313233 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 79 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 85 @@ -1119,8 +1119,8 @@ a2 0x373839 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 80 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 86 @@ -1132,22 +1132,22 @@ USER a2 - 99bcddc1-3460-4630-9904-6c7ca5811945 - 81 + cb20a92a-2c2f-4305-a838-2a01c6e73e18 + 87 OP_ROLLING_UPGRADE_START 86 - 1421131352186 + 1421135813268 OP_ROLLING_UPGRADE_FINALIZE 87 - 1421131352186 + 1421135813268