diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cefdc166247..6da066784ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -18,6 +18,8 @@ Trunk (Unreleased) HDFS-3125. Add JournalService to enable Journal Daemon. (suresh) + HDFS-3107. Introduce truncate. (Plamen Jeliazkov via shv) + IMPROVEMENTS HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 62db1fac8cc..f289da76886 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1916,6 +1916,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SnapshotAccessControlException.class); } } + + /** + * Truncate a file to an indicated size + * See {@link ClientProtocol#truncate(String, long)}. + */ + public boolean truncate(String src, long newLength) throws IOException { + checkOpen(); + try { + return namenode.truncate(src, newLength, clientName); + } catch (RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + UnresolvedPathException.class); + } + } + /** * Delete file or directory. * See {@link ClientProtocol#delete(String, boolean)}. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index d4653acf66e..6284f61d4fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -626,7 +626,20 @@ public class DistributedFileSystem extends FileSystem { }.resolve(this, absDst); } } - + + /** + * Truncate the file in the indicated path to the indicated size. + * @param f The path to the file to be truncated + * @param newLength The size the file is to be truncated to + * + * @return true if and client does not need to wait for block recovery, + * false if client needs to wait for block recovery. + */ + public boolean truncate(Path f, final long newLength) throws IOException { + statistics.incrementWriteOps(1); + return dfs.truncate(getPathName(f), newLength); + } + @Override public boolean delete(Path f, final boolean recursive) throws IOException { statistics.incrementWriteOps(1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 2301575f0e3..749f3876df3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -521,7 +521,37 @@ public interface ClientProtocol { FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, SnapshotAccessControlException, IOException; - + + /** + * Truncate file src to new size. + *
+ * This implementation of truncate is purely a namespace operation if truncate + * occurs at a block boundary. Requires DataNode block recovery otherwise. + *
+ * @param src existing file
+ * @param newLength the target size
+ *
+ * @return true if and client does not need to wait for block recovery,
+ * false if client needs to wait for block recovery.
+ *
+ * @throws AccessControlException If access is denied
+ * @throws FileNotFoundException If file src
is not found
+ * @throws SafeModeException truncate not allowed in safemode
+ * @throws UnresolvedLinkException If src
contains a symlink
+ * @throws SnapshotAccessControlException if path is in RO snapshot
+ * @throws IOException If an I/O error occurred
+ */
+ @Idempotent
+ public boolean truncate(String src, long newLength, String clientName)
+ throws AccessControlException, FileNotFoundException, SafeModeException,
+ UnresolvedLinkException, SnapshotAccessControlException, IOException;
+
/**
* Delete the given file or directory from the file system.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 5b6609bfbbe..8bcc1eb77b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -181,6 +181,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSto
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
@@ -584,6 +586,18 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
return VOID_RENAME2_RESPONSE;
}
+ @Override
+ public TruncateResponseProto truncate(RpcController controller,
+ TruncateRequestProto req) throws ServiceException {
+ try {
+ boolean result = server.truncate(req.getSrc(), req.getNewLength(),
+ req.getClientName());
+ return TruncateResponseProto.newBuilder().setResult(result).build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
@Override
public DeleteResponseProto delete(RpcController controller,
DeleteRequestProto req) throws ServiceException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 58049204fc4..f3826af880d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuo
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
@@ -301,6 +302,21 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
+ @Override
+ public boolean truncate(String src, long newLength, String clientName)
+ throws IOException, UnresolvedLinkException {
+ TruncateRequestProto req = TruncateRequestProto.newBuilder()
+ .setSrc(src)
+ .setNewLength(newLength)
+ .setClientName(clientName)
+ .build();
+ try {
+ return rpcProxy.truncate(null, req).getResult();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
@Override
public LastBlockWithStatus append(String src, String clientName)
throws AccessControlException, DSQuotaExceededException,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index ee6d58cbfae..3f6a7f3694c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -608,13 +608,15 @@ public class PBHelper {
}
LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
return RecoveringBlockProto.newBuilder().setBlock(lb)
- .setNewGenStamp(b.getNewGenerationStamp()).build();
+ .setNewGenStamp(b.getNewGenerationStamp())
+ .setTruncateFlag(b.getTruncateFlag()).build();
}
public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = convert(b.getBlock().getB());
DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
- return new RecoveringBlock(block, locs, b.getNewGenStamp());
+ return new RecoveringBlock(block, locs, b.getNewGenStamp(),
+ b.getTruncateFlag());
}
public static DatanodeInfoProto.AdminState convert(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
index f19ad1c511b..28b179db1c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
@@ -273,7 +273,11 @@ public class BlockInfoUnderConstruction extends BlockInfo {
* make it primary.
*/
public void initializeBlockRecovery(long recoveryId) {
- setBlockUCState(BlockUCState.UNDER_RECOVERY);
+ initializeBlockRecovery(BlockUCState.UNDER_RECOVERY, recoveryId);
+ }
+
+ public void initializeBlockRecovery(BlockUCState s, long recoveryId) {
+ setBlockUCState(s);
blockRecoveryId = recoveryId;
if (replicas.size() == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*"
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 41d03633fea..918b8d99a78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -1439,10 +1440,12 @@ public class DatanodeManager {
LOG.info("Skipped stale nodes for recovery : " +
(storages.length - recoveryLocations.size()));
}
+ boolean isTruncate = b.getBlockUCState().equals(
+ HdfsServerConstants.BlockUCState.BEING_TRUNCATED);
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
- b.getBlockRecoveryId()));
+ b.getBlockRecoveryId(), isTruncate));
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index a3198e20ef3..3ab10b4488c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -240,7 +240,7 @@ public class DatanodeStorageInfo {
return result;
}
- boolean removeBlock(BlockInfo b) {
+ public boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);
if (b.removeStorage(this)) {
numBlocks--;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index 9bba2c950b8..f2e7ff44e46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -299,6 +299,13 @@ public final class HdfsServerConstants {
* which synchronizes the existing replicas contents.
*/
UNDER_RECOVERY,
+ /**
+ * The block is being truncated.
+ * When a file is truncated its last block may need to be truncated
+ * and needs to go through a recovery procedure,
+ * which synchronizes the existing replicas contents.
+ */
+ BEING_TRUNCATED,
/**
* The block is committed.
* The client reported that all bytes are written to data-nodes
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index d5d84290571..7f95f331500 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2691,7 +2691,10 @@ public class DataNode extends ReconfigurableBase
r.rInfo.getNumBytes() == finalizedLength)
participatingList.add(r);
}
- newBlock.setNumBytes(finalizedLength);
+ if(rBlock.getTruncateFlag())
+ newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
+ else
+ newBlock.setNumBytes(finalizedLength);
break;
case RBW:
case RWR:
@@ -2703,7 +2706,10 @@ public class DataNode extends ReconfigurableBase
participatingList.add(r);
}
}
- newBlock.setNumBytes(minLength);
+ if(rBlock.getTruncateFlag())
+ newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
+ else
+ newBlock.setNumBytes(minLength);
break;
case RUR:
case TEMPORARY:
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index b39519f55ef..1948099fe9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1087,7 +1087,71 @@ public class FSDirectory implements Closeable {
public INodeMap getINodeMap() {
return inodeMap;
}
-
+
+ /**
+ * FSEditLogLoader implementation.
+ * Unlike FSNamesystem.truncate, this will not schedule block recovery.
+ */
+ void unprotectedTruncate(String src, String clientName, String clientMachine,
+ long newLength, long mtime)
+ throws UnresolvedLinkException, QuotaExceededException,
+ SnapshotAccessControlException, IOException {
+ INodesInPath iip = getINodesInPath(src, true);
+ BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+ boolean onBlockBoundary =
+ unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
+
+ if(! onBlockBoundary) {
+ getFSNamesystem().prepareFileForWrite(src,
+ iip, clientName, clientMachine, false, false);
+ }
+ getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+ }
+
+ boolean truncate(INodesInPath iip, long newLength,
+ BlocksMapUpdateInfo collectedBlocks,
+ long mtime)
+ throws IOException {
+ writeLock();
+ try {
+ return unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * Truncate has the following properties:
+ * 1.) Any block deletions occur now.
+ * 2.) INode length is truncated now – clients can only read up to new length.
+ * 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY.
+ * 4.) NN will trigger DN truncation recovery and waits for DNs to report.
+ * 5.) File is considered UNDER_RECOVERY until truncation recovery completes.
+ * 6.) Soft and hard Lease expiration require truncation recovery to complete.
+ *
+ * @return true if on the block boundary or false if recovery is need
+ */
+ boolean unprotectedTruncate(INodesInPath iip, long newLength,
+ BlocksMapUpdateInfo collectedBlocks,
+ long mtime) throws IOException {
+ assert hasWriteLock();
+ INodeFile file = iip.getLastINode().asFile();
+ long oldDiskspace = file.diskspaceConsumed();
+ long remainingLength =
+ file.collectBlocksBeyondMax(newLength, collectedBlocks);
+ file.setModificationTime(mtime);
+ updateCount(iip, 0, file.diskspaceConsumed() - oldDiskspace, true);
+ // If on block boundary, then return
+ long lastBlockDelta = remainingLength - newLength;
+ if(lastBlockDelta == 0)
+ return true;
+ // Set new last block length
+ BlockInfo lastBlock = file.getLastBlock();
+ assert lastBlock.getNumBytes() - lastBlockDelta > 0 : "wrong block size";
+ lastBlock.setNumBytes(lastBlock.getNumBytes() - lastBlockDelta);
+ return false;
+ }
+
/**
* This method is always called with writeLock of FSDirectory held.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 4a29b59957d..d32aad933a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
@@ -896,6 +897,20 @@ public class FSEditLog implements LogsPurgeable {
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
+
+ /**
+ * Add truncate file record to edit log
+ */
+ void logTruncate(String src, String clientName, String clientMachine,
+ long size, long timestamp) {
+ TruncateOp op = TruncateOp.getInstance(cache.get())
+ .setPath(src)
+ .setClientName(clientName)
+ .setClientMachine(clientMachine)
+ .setNewLength(size)
+ .setTimestamp(timestamp);
+ logEdit(op);
+ }
/**
* Add legacy block generation stamp record to edit log
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 011892646de..2ff3b772d2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
import static org.apache.hadoop.util.Time.now;
@@ -853,6 +854,12 @@ public class FSEditLogLoader {
}
break;
}
+ case OP_TRUNCATE: {
+ TruncateOp truncateOp = (TruncateOp) op;
+ fsDir.unprotectedTruncate(truncateOp.src, truncateOp.clientName,
+ truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp);
+ break;
+ }
case OP_SET_STORAGE_POLICY: {
SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 11026fc80ad..396fb08de64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -59,6 +59,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_XAT
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TRUNCATE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY;
@@ -180,6 +181,7 @@ public abstract class FSEditLogOp {
inst.put(OP_START_LOG_SEGMENT, new LogSegmentOp(OP_START_LOG_SEGMENT));
inst.put(OP_END_LOG_SEGMENT, new LogSegmentOp(OP_END_LOG_SEGMENT));
inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
+ inst.put(OP_TRUNCATE, new TruncateOp());
inst.put(OP_ALLOW_SNAPSHOT, new AllowSnapshotOp());
inst.put(OP_DISALLOW_SNAPSHOT, new DisallowSnapshotOp());
@@ -2602,6 +2604,115 @@ public abstract class FSEditLogOp {
readRpcIdsFromXml(st);
}
}
+
+ static class TruncateOp extends FSEditLogOp {
+ String src;
+ String clientName;
+ String clientMachine;
+ long newLength;
+ long timestamp;
+
+ private TruncateOp() {
+ super(OP_TRUNCATE);
+ }
+
+ static TruncateOp getInstance(OpInstanceCache cache) {
+ return (TruncateOp)cache.get(OP_TRUNCATE);
+ }
+
+ @Override
+ void resetSubFields() {
+ src = null;
+ clientName = null;
+ clientMachine = null;
+ newLength = 0L;
+ timestamp = 0L;
+ }
+
+ TruncateOp setPath(String src) {
+ this.src = src;
+ return this;
+ }
+
+ TruncateOp setClientName(String clientName) {
+ this.clientName = clientName;
+ return this;
+ }
+
+ TruncateOp setClientMachine(String clientMachine) {
+ this.clientMachine = clientMachine;
+ return this;
+ }
+
+ TruncateOp setNewLength(long newLength) {
+ this.newLength = newLength;
+ return this;
+ }
+
+ TruncateOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ src = FSImageSerialization.readString(in);
+ clientName = FSImageSerialization.readString(in);
+ clientMachine = FSImageSerialization.readString(in);
+ newLength = FSImageSerialization.readLong(in);
+ timestamp = FSImageSerialization.readLong(in);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ FSImageSerialization.writeString(src, out);
+ FSImageSerialization.writeString(clientName, out);
+ FSImageSerialization.writeString(clientMachine, out);
+ FSImageSerialization.writeLong(newLength, out);
+ FSImageSerialization.writeLong(timestamp, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "SRC", src);
+ XMLUtils.addSaxString(contentHandler, "CLIENTNAME", clientName);
+ XMLUtils.addSaxString(contentHandler, "CLIENTMACHINE", clientMachine);
+ XMLUtils.addSaxString(contentHandler, "NEWLENGTH",
+ Long.toString(newLength));
+ XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
+ Long.toString(timestamp));
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ this.src = st.getValue("SRC");
+ this.clientName = st.getValue("CLIENTNAME");
+ this.clientMachine = st.getValue("CLIENTMACHINE");
+ this.newLength = Long.parseLong(st.getValue("NEWLENGTH"));
+ this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("TruncateOp [src=");
+ builder.append(src);
+ builder.append(", clientName=");
+ builder.append(clientName);
+ builder.append(", clientMachine=");
+ builder.append(clientMachine);
+ builder.append(", newLength=");
+ builder.append(newLength);
+ builder.append(", timestamp=");
+ builder.append(timestamp);
+ builder.append(", opCode=");
+ builder.append(opCode);
+ builder.append(", txid=");
+ builder.append(txid);
+ builder.append("]");
+ return builder.toString();
+ }
+ }
/**
* {@literal @Idempotent} for {@link ClientProtocol#recoverLease}. In the
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
index 86be54adb7f..468e0482f31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
@@ -73,6 +73,7 @@ public enum FSEditLogOpCodes {
OP_SET_XATTR ((byte) 43),
OP_REMOVE_XATTR ((byte) 44),
OP_SET_STORAGE_POLICY ((byte) 45),
+ OP_TRUNCATE ((byte) 46),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 36a43347566..c250838e48a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1906,6 +1906,114 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
logAuditEvent(true, "setTimes", src, null, auditStat);
}
+ /**
+ * Truncate file to a lower length.
+ * Truncate cannot be reverted / recovered from as it causes data loss.
+ * Truncation at block boundary is atomic, otherwise it requires
+ * block recovery to truncate the last block of the file.
+ *
+ * @return true if and client does not need to wait for block recovery,
+ * false if client needs to wait for block recovery.
+ */
+ boolean truncate(String src, long newLength,
+ String clientName, String clientMachine,
+ long mtime)
+ throws IOException, UnresolvedLinkException {
+ boolean ret;
+ try {
+ ret = truncateInt(src, newLength, clientName, clientMachine, mtime);
+ } catch (AccessControlException e) {
+ logAuditEvent(false, "truncate", src);
+ throw e;
+ }
+ return ret;
+ }
+
+ boolean truncateInt(String srcArg, long newLength,
+ String clientName, String clientMachine,
+ long mtime)
+ throws IOException, UnresolvedLinkException {
+ String src = srcArg;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.truncate: src="
+ + src + " newLength=" + newLength);
+ }
+ HdfsFileStatus stat = null;
+ FSPermissionChecker pc = getPermissionChecker();
+ checkOperation(OperationCategory.WRITE);
+ boolean res;
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ writeLock();
+ try {
+ checkOperation(OperationCategory.WRITE);
+ checkNameNodeSafeMode("Cannot truncate for " + src);
+ src = dir.resolvePath(pc, src, pathComponents);
+ res = truncateInternal(src, newLength, clientName,
+ clientMachine, mtime, pc);
+ stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
+ FSDirectory.isReservedRawName(src), true);
+ } finally {
+ writeUnlock();
+ }
+ getEditLog().logSync();
+ logAuditEvent(true, "truncate", src, null, stat);
+ return res;
+ }
+
+ /**
+ * Truncate a file to a given size
+ * Update the count at each ancestor directory with quota
+ */
+ boolean truncateInternal(String src, long newLength,
+ String clientName, String clientMachine,
+ long mtime, FSPermissionChecker pc)
+ throws IOException, UnresolvedLinkException {
+ assert hasWriteLock();
+ INodesInPath iip = dir.getINodesInPath4Write(src, true);
+ if (isPermissionEnabled) {
+ dir.checkPathAccess(pc, iip, FsAction.WRITE);
+ }
+ INodeFile file = iip.getLastINode().asFile();
+ // Data will be lost after truncate occurs so it cannot support snapshots.
+ if(file.isInLatestSnapshot(iip.getLatestSnapshotId()))
+ throw new HadoopIllegalArgumentException(
+ "Cannot truncate file with snapshot.");
+ // Opening an existing file for write. May need lease recovery.
+ recoverLeaseInternal(iip, src, clientName, clientMachine, false);
+ // Refresh INode as the file could have been closed
+ iip = dir.getINodesInPath4Write(src, true);
+ file = INodeFile.valueOf(iip.getLastINode(), src);
+ // Truncate length check.
+ long oldLength = file.computeFileSize();
+ if(oldLength == newLength)
+ return true;
+ if(oldLength < newLength)
+ throw new HadoopIllegalArgumentException(
+ "Cannot truncate to a larger file size. Current size: " + oldLength +
+ ", truncate size: " + newLength + ".");
+ // Perform INodeFile truncation.
+ BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+ boolean onBlockBoundary = dir.truncate(iip, newLength,
+ collectedBlocks, mtime);
+
+ if(! onBlockBoundary) {
+ // Open file for write, but don't log into edits
+ prepareFileForWrite(src, iip, clientName, clientMachine, false, false);
+ file = INodeFile.valueOf(dir.getINode4Write(src), src);
+ initializeBlockRecovery(file);
+ }
+ getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime);
+ removeBlocks(collectedBlocks);
+ return onBlockBoundary;
+ }
+
+ void initializeBlockRecovery(INodeFile inodeFile) throws IOException {
+ BlockInfo lastBlock = inodeFile.getLastBlock();
+ long recoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(lastBlock));
+ ((BlockInfoUnderConstruction)lastBlock).initializeBlockRecovery(
+ BlockUCState.BEING_TRUNCATED, recoveryId);
+ }
+
/**
* Create a symbolic link.
*/
@@ -2615,7 +2723,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} else {
final BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null
- && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
+ && (lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY ||
+ lastBlock.getBlockUCState() == BlockUCState.BEING_TRUNCATED)) {
throw new RecoveryInProgressException("Recovery in progress, file ["
+ src + "], " + "lease owner [" + lease.getHolder() + "]");
} else {
@@ -3833,6 +3942,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new AlreadyBeingCreatedException(message);
case UNDER_CONSTRUCTION:
case UNDER_RECOVERY:
+ case BEING_TRUNCATED:
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock;
// setup the last block locations from the blockManager if not known
if (uc.getNumExpectedLocations() == 0) {
@@ -3854,7 +3964,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
- uc.initializeBlockRecovery(blockRecoveryId);
+ if (uc.getBlockUCState() != BlockUCState.BEING_TRUNCATED) {
+ uc.initializeBlockRecovery(blockRecoveryId);
+ }
leaseManager.renewLease(lease);
// Cannot close file right now, since the last block requires recovery.
// This may potentially cause infinite loop in lease recovery
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index b811f122d1f..d1ff2f78979 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -696,4 +696,43 @@ public class INodeFile extends INodeWithAdditionalFields
out.print(blocks == null || blocks.length == 0? null: blocks[0]);
out.println();
}
+
+ /**
+ * Remove full blocks at the end file up to newLength
+ * @return sum of sizes of the remained blocks
+ */
+ public long collectBlocksBeyondMax(final long max,
+ final BlocksMapUpdateInfo collectedBlocks) {
+ final BlockInfo[] oldBlocks = getBlocks();
+ if (oldBlocks == null)
+ return 0;
+ //find the minimum n such that the size of the first n blocks > max
+ int n = 0;
+ long size = 0;
+ for(; n < oldBlocks.length && max > size; n++) {
+ size += oldBlocks[n].getNumBytes();
+ }
+ if (n >= oldBlocks.length)
+ return size;
+
+ // starting from block n, the data is beyond max.
+ // resize the array.
+ final BlockInfo[] newBlocks;
+ if (n == 0) {
+ newBlocks = BlockInfo.EMPTY_ARRAY;
+ } else {
+ newBlocks = new BlockInfo[n];
+ System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
+ }
+ // set new blocks
+ setBlocks(newBlocks);
+
+ // collect the blocks beyond max
+ if (collectedBlocks != null) {
+ for(; n < oldBlocks.length; n++) {
+ collectedBlocks.addDeleteBlock(oldBlocks[n]);
+ }
+ }
+ return size;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 824094675da..6ef8fd6b175 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.MAX_PATH_LENGTH;
+import static org.apache.hadoop.util.Time.now;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -882,6 +883,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
metrics.incrFilesRenamed();
}
+ @Override // ClientProtocol
+ public boolean truncate(String src, long newLength, String clientName)
+ throws IOException {
+ if(stateChangeLog.isDebugEnabled()) {
+ stateChangeLog.debug("*DIR* NameNode.truncate: " + src + " to " +
+ newLength);
+ }
+ String clientMachine = getClientMachine();
+ try {
+ return namesystem.truncate(
+ src, newLength, clientName, clientMachine, now());
+ } finally {
+ metrics.incrFilesTruncated();
+ }
+ }
+
@Override // ClientProtocol
public boolean delete(String src, boolean recursive) throws IOException {
if (stateChangeLog.isDebugEnabled()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 42942dc51bc..94e845ba3c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -47,6 +47,7 @@ public class NameNodeMetrics {
@Metric MutableCounterLong filesAppended;
@Metric MutableCounterLong getBlockLocations;
@Metric MutableCounterLong filesRenamed;
+ @Metric MutableCounterLong filesTruncated;
@Metric MutableCounterLong getListingOps;
@Metric MutableCounterLong deleteFileOps;
@Metric("Number of files/dirs deleted by delete or rename operations")
@@ -173,6 +174,10 @@ public class NameNodeMetrics {
filesRenamed.incr();
}
+ public void incrFilesTruncated() {
+ filesTruncated.incr();
+ }
+
public void incrFilesDeleted(long delta) {
filesDeleted.incr(delta);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
index e3bf349609c..16f534f0a9b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.AclFeature;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -182,40 +181,6 @@ public class FileWithSnapshotFeature implements INode.Feature {
max = file.computeFileSize();
}
- collectBlocksBeyondMax(file, max, info);
- }
-
- private void collectBlocksBeyondMax(final INodeFile file, final long max,
- final BlocksMapUpdateInfo collectedBlocks) {
- final BlockInfo[] oldBlocks = file.getBlocks();
- if (oldBlocks != null) {
- //find the minimum n such that the size of the first n blocks > max
- int n = 0;
- for(long size = 0; n < oldBlocks.length && max > size; n++) {
- size += oldBlocks[n].getNumBytes();
- }
-
- // starting from block n, the data is beyond max.
- if (n < oldBlocks.length) {
- // resize the array.
- final BlockInfo[] newBlocks;
- if (n == 0) {
- newBlocks = BlockInfo.EMPTY_ARRAY;
- } else {
- newBlocks = new BlockInfo[n];
- System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
- }
-
- // set new blocks
- file.setBlocks(newBlocks);
-
- // collect the blocks beyond max.
- if (collectedBlocks != null) {
- for(; n < oldBlocks.length; n++) {
- collectedBlocks.addDeleteBlock(oldBlocks[n]);
- }
- }
- }
- }
+ file.collectBlocksBeyondMax(max, info);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
index b7199ba3803..c51203894c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
@@ -53,6 +53,7 @@ public class BlockRecoveryCommand extends DatanodeCommand {
@InterfaceAudience.Private
@InterfaceStability.Evolving
public static class RecoveringBlock extends LocatedBlock {
+ private boolean truncate;
private final long newGenerationStamp;
/**
@@ -63,6 +64,15 @@ public class BlockRecoveryCommand extends DatanodeCommand {
this.newGenerationStamp = newGS;
}
+ /**
+ * RecoveryingBlock with truncate option.
+ */
+ public RecoveringBlock(ExtendedBlock b, DatanodeInfo[] locs, long newGS,
+ boolean truncate) {
+ this(b, locs, newGS);
+ this.truncate = truncate;
+ }
+
/**
* Return the new generation stamp of the block,
* which also plays role of the recovery id.
@@ -70,6 +80,13 @@ public class BlockRecoveryCommand extends DatanodeCommand {
public long getNewGenerationStamp() {
return newGenerationStamp;
}
+
+ /**
+ * Return whether to truncate the block to the ExtendedBlock's length.
+ */
+ public boolean getTruncateFlag() {
+ return truncate;
+ }
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 2c1d3cb9f3e..5c9f7528a79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -198,6 +198,16 @@ message ConcatRequestProto {
message ConcatResponseProto { // void response
}
+message TruncateRequestProto {
+ required string src = 1;
+ required uint64 newLength = 2;
+ required string clientName = 3;
+}
+
+message TruncateResponseProto {
+ required bool result = 1;
+}
+
message RenameRequestProto {
required string src = 1;
required string dst = 2;
@@ -722,6 +732,7 @@ service ClientNamenodeProtocol {
rpc reportBadBlocks(ReportBadBlocksRequestProto)
returns(ReportBadBlocksResponseProto);
rpc concat(ConcatRequestProto) returns(ConcatResponseProto);
+ rpc truncate(TruncateRequestProto) returns(TruncateResponseProto);
rpc rename(RenameRequestProto) returns(RenameResponseProto);
rpc rename2(Rename2RequestProto) returns(Rename2ResponseProto);
rpc delete(DeleteRequestProto) returns(DeleteResponseProto);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 04a8f3f0fb0..d989c0a8eb3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -556,6 +556,7 @@ enum ReplicaStateProto {
message RecoveringBlockProto {
required uint64 newGenStamp = 1; // New genstamp post recovery
required LocatedBlockProto block = 2; // Block to be recovered
+ optional bool truncateFlag = 3; // Block needs to be truncated
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 01f5d2e1932..15f5f2eafcf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1194,7 +1194,13 @@ public class DFSTestUtil {
DFSTestUtil.createFile(filesystem, pathConcatFiles[1], length, replication,
seed);
filesystem.concat(pathConcatTarget, pathConcatFiles);
-
+
+ // OP_TRUNCATE 46
+ length = blockSize * 2;
+ DFSTestUtil.createFile(filesystem, pathFileCreate, length, replication,
+ seed);
+ filesystem.truncate(pathFileCreate, blockSize);
+
// OP_SYMLINK 17
Path pathSymlink = new Path("/file_symlink");
fc.createSymlink(pathConcatTarget, pathSymlink, false);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index 77a17eddeaa..75a4ad4311e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -71,7 +71,7 @@ public class TestDFSInotifyEventInputStream {
*/
@Test
public void testOpcodeCount() {
- Assert.assertEquals(47, FSEditLogOpCodes.values().length);
+ Assert.assertEquals(48, FSEditLogOpCodes.values().length);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java
new file mode 100644
index 00000000000..ba9d04e914e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileTruncate {
+ static {
+ GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
+ GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL);
+ }
+ static final int BLOCK_SIZE = 4;
+ static final short REPLICATION = 3;
+ static final int DATANODE_NUM = 3;
+ static final int SUCCESS_ATTEMPTS = 300;
+ static final int RECOVERY_ATTEMPTS = 600;
+ static final long SLEEP = 100L;
+
+ static final long LOW_SOFTLIMIT = 100L;
+ static final long LOW_HARDLIMIT = 200L;
+ static final int SHORT_HEARTBEAT = 1;
+
+ static Configuration conf;
+ static MiniDFSCluster cluster;
+ static DistributedFileSystem fs;
+
+ @BeforeClass
+ public static void startUp() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .format(true)
+ .numDataNodes(DATANODE_NUM)
+ .nameNodePort(NameNode.DEFAULT_PORT)
+ .waitSafeMode(true)
+ .build();
+ fs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if(fs != null) fs.close();
+ if(cluster != null) cluster.shutdown();
+ }
+
+ /**
+ * Truncate files of different sizes byte by byte.
+ */
+ @Test
+ public void testBasicTruncate() throws IOException {
+ int startingFileSize = 3 * BLOCK_SIZE;
+
+ Path parent = new Path("/test");
+ fs.mkdirs(parent);
+ fs.setQuota(parent, 100, 1000);
+ byte[] contents = AppendTestUtil.initBuffer(startingFileSize);
+ for (int fileLength = startingFileSize; fileLength > 0;
+ fileLength -= BLOCK_SIZE - 1) {
+ for (int toTruncate = 0; toTruncate <= fileLength; toTruncate++) {
+ final Path p = new Path(parent, "testBasicTruncate" + fileLength);
+ writeContents(contents, fileLength, p);
+
+ int newLength = fileLength - toTruncate;
+ boolean isReady = fs.truncate(p, newLength);
+
+ if(!isReady)
+ checkBlockRecovery(p);
+
+ FileStatus fileStatus = fs.getFileStatus(p);
+ assertThat(fileStatus.getLen(), is((long) newLength));
+
+ ContentSummary cs = fs.getContentSummary(parent);
+ assertEquals("Bad disk space usage",
+ cs.getSpaceConsumed(), newLength * REPLICATION);
+ // validate the file content
+ AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString());
+ }
+ }
+ fs.delete(parent, true);
+ }
+
+ /**
+ * Failure / recovery test for truncate.
+ * In this failure the DNs fail to recover the blocks and the NN triggers
+ * lease recovery.
+ * File stays in RecoveryInProgress until DataNodes report recovery.
+ */
+ @Test
+ public void testTruncateFailure() throws IOException {
+ int startingFileSize = 2 * BLOCK_SIZE + BLOCK_SIZE / 2;
+ int toTruncate = 1;
+
+ byte[] contents = AppendTestUtil.initBuffer(startingFileSize);
+ final Path p = new Path("/testTruncateFailure");
+ FSDataOutputStream out = fs.create(p, false, BLOCK_SIZE, REPLICATION,
+ BLOCK_SIZE);
+ out.write(contents, 0, startingFileSize);
+ try {
+ fs.truncate(p, 0);
+ fail("Truncate must fail on open file.");
+ } catch(IOException expected) {}
+ out.close();
+
+ cluster.shutdownDataNodes();
+ NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
+ .setLeasePeriod(LOW_SOFTLIMIT, LOW_HARDLIMIT);
+
+ int newLength = startingFileSize - toTruncate;
+ boolean isReady = fs.truncate(p, newLength);
+ assertThat("truncate should have triggered block recovery.",
+ isReady, is(false));
+ FileStatus fileStatus = fs.getFileStatus(p);
+ assertThat(fileStatus.getLen(), is((long) newLength));
+
+ boolean recoveryTriggered = false;
+ for(int i = 0; i < RECOVERY_ATTEMPTS; i++) {
+ String leaseHolder =
+ NameNodeAdapter.getLeaseHolderForPath(cluster.getNameNode(),
+ p.toUri().getPath());
+ if(leaseHolder.equals(HdfsServerConstants.NAMENODE_LEASE_HOLDER)) {
+ cluster.startDataNodes(conf, DATANODE_NUM, true,
+ HdfsServerConstants.StartupOption.REGULAR, null);
+ recoveryTriggered = true;
+ break;
+ }
+ try { Thread.sleep(SLEEP); } catch (InterruptedException ignored) {}
+ }
+ assertThat("lease recovery should have occurred in ~" +
+ SLEEP * RECOVERY_ATTEMPTS + " ms.", recoveryTriggered, is(true));
+
+ checkBlockRecovery(p);
+
+ NameNodeAdapter.getLeaseManager(cluster.getNamesystem())
+ .setLeasePeriod(HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
+ HdfsConstants.LEASE_HARDLIMIT_PERIOD);
+
+ fileStatus = fs.getFileStatus(p);
+ assertThat(fileStatus.getLen(), is((long) newLength));
+
+ AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString());
+ fs.delete(p, false);
+ }
+
+ /**
+ * EditLogOp load test for Truncate.
+ */
+ @Test
+ public void testTruncateEditLogLoad() throws IOException {
+ int startingFileSize = 2 * BLOCK_SIZE + BLOCK_SIZE / 2;
+ int toTruncate = 1;
+
+ byte[] contents = AppendTestUtil.initBuffer(startingFileSize);
+
+ final Path p = new Path("/testTruncateEditLogLoad");
+ writeContents(contents, startingFileSize, p);
+
+ int newLength = startingFileSize - toTruncate;
+ boolean isReady = fs.truncate(p, newLength);
+ assertThat("truncate should have triggered block recovery.",
+ isReady, is(false));
+
+ checkBlockRecovery(p);
+
+ cluster.restartNameNode();
+
+ FileStatus fileStatus = fs.getFileStatus(p);
+ assertThat(fileStatus.getLen(), is((long) newLength));
+
+ AppendTestUtil.checkFullFile(fs, p, newLength, contents, p.toString());
+ fs.delete(p, false);
+ }
+
+ /**
+ * Check truncate recovery.
+ */
+ @Test
+ public void testTruncateLastBlock() throws IOException {
+ FSNamesystem fsn = cluster.getNamesystem();
+
+ String src = "/file";
+ Path srcPath = new Path(src);
+
+ byte[] contents = AppendTestUtil.initBuffer(BLOCK_SIZE);
+ writeContents(contents, BLOCK_SIZE, srcPath);
+
+ INodeFile inode = fsn.getFSDirectory().getINode(src).asFile();
+ long oldGenstamp = GenerationStamp.LAST_RESERVED_STAMP;
+ DatanodeDescriptor dn = DFSTestUtil.getLocalDatanodeDescriptor();
+ DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo(
+ dn.getDatanodeUuid(), InetAddress.getLocalHost().getHostAddress());
+ dn.isAlive = true;
+
+ BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
+ new Block(0, 1, oldGenstamp), (short) 1,
+ HdfsServerConstants.BlockUCState.BEING_TRUNCATED,
+ new DatanodeStorageInfo[] {storage});
+
+ inode.setBlocks(new BlockInfo[] {blockInfo});
+ fsn.writeLock();
+ try {
+ fsn.initializeBlockRecovery(inode);
+ assertThat(inode.getLastBlock().getBlockUCState(),
+ is(HdfsServerConstants.BlockUCState.BEING_TRUNCATED));
+ long blockRecoveryId = ((BlockInfoUnderConstruction) inode.getLastBlock())
+ .getBlockRecoveryId();
+ assertThat(blockRecoveryId, is(oldGenstamp + 2));
+ } finally {
+ fsn.writeUnlock();
+ }
+ }
+
+ static void writeContents(byte[] contents, int fileLength, Path p)
+ throws IOException {
+ FSDataOutputStream out = fs.create(p, true, BLOCK_SIZE, REPLICATION,
+ BLOCK_SIZE);
+ out.write(contents, 0, fileLength);
+ out.close();
+ }
+
+ static void checkBlockRecovery(Path p) throws IOException {
+ boolean success = false;
+ for(int i = 0; i < SUCCESS_ATTEMPTS; i++) {
+ LocatedBlocks blocks = getLocatedBlocks(p);
+ boolean noLastBlock = blocks.getLastLocatedBlock() == null;
+ if(!blocks.isUnderConstruction() &&
+ (noLastBlock || blocks.isLastBlockComplete())) {
+ success = true;
+ break;
+ }
+ try { Thread.sleep(SLEEP); } catch (InterruptedException ignored) {}
+ }
+ assertThat("inode should complete in ~" + SLEEP * SUCCESS_ATTEMPTS + " ms.",
+ success, is(true));
+ }
+
+ static LocatedBlocks getLocatedBlocks(Path src) throws IOException {
+ return fs.getClient().getLocatedBlocks(src.toString(), 0, Long.MAX_VALUE);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
index b9e62e3f6e9..3084f2684fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
@@ -409,7 +409,7 @@ public class TestNamenodeRetryCache {
LightWeightCache