diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt index 4d9c2eacfcf..998e18d5c2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt @@ -32,3 +32,7 @@ HDFS-5535 subtasks: HDFS-5889. When starting rolling upgrade, create a fs image for rollback so that the standby namenode can create checkpoints during upgrade. (szetszwo & jing9) + + HDFS-5535. Add BlockPoolSliceStorage 'trash' to handle block deletions + during rolling upgrades. (Arpit Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index ed966bcdd7e..acd190d714e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -20,6 +20,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext; @@ -81,7 +83,10 @@ static public enum StartupOption{ FORCE("-force"), NONINTERACTIVE("-nonInteractive"), RENAMERESERVED("-renameReserved"); - + + private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile( + "(\\w+)\\((\\w+)\\)"); + private final String name; // Used only with format and upgrade options @@ -167,6 +172,17 @@ public String toString() { } return super.toString(); } + + static public StartupOption getEnum(String value) { + Matcher matcher = ENUM_WITH_ROLLING_UPGRADE_OPTION.matcher(value); + if (matcher.matches()) { + StartupOption option = StartupOption.valueOf(matcher.group(1)); + option.setRollingUpgradeStartupOption(matcher.group(2)); + return option; + } else { + return StartupOption.valueOf(value); + } + } } // Timeouts for communicating with DataNode for streaming writes/reads diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 8f8dd217273..65fdfc3e15b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -413,6 +413,18 @@ List getBPServiceActors() { return Lists.newArrayList(bpServices); } + /** + * Signal the current rolling upgrade status as indicated by the NN. + * @param inProgress true if a rolling upgrade is in progress + */ + void signalRollingUpgrade(boolean inProgress) { + if (inProgress) { + dn.getFSDataset().enableDeleteToTrash(getBlockPoolId()); + } else { + dn.getFSDataset().disableAndPurgeTrashStorage(getBlockPoolId()); + } + } + /** * Update the BPOS's view of which NN is active, based on a heartbeat * response from one of the actors. @@ -611,7 +623,8 @@ private boolean processCommandFromActive(DatanodeCommand cmd, // See HDFS-2987. throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN"); case DatanodeProtocol.DNA_FINALIZE: - String bp = ((FinalizeCommand) cmd).getBlockPoolId(); + String bp = ((FinalizeCommand) cmd).getBlockPoolId(); + LOG.info("Got finalize command for block pool " + bp); assert getBlockPoolId().equals(bp) : "BP " + getBlockPoolId() + " received DNA_FINALIZE " + "for other block pool " + bp; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 3c6e850d406..66d22ed6152 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -33,8 +33,8 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; @@ -611,6 +611,20 @@ private synchronized void cleanUp() { bpos.shutdownActor(this); } + private void handleRollingUpgradeStatus(HeartbeatResponse resp) { + RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus(); + if (rollingUpgradeStatus != null && + rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) { + // Can this ever occur? + LOG.error("Invalid BlockPoolId " + + rollingUpgradeStatus.getBlockPoolId() + + " in HeartbeatResponse. Expected " + + bpos.getBlockPoolId()); + } else { + bpos.signalRollingUpgrade(rollingUpgradeStatus != null); + } + } + /** * Main loop for each BP thread. Run until shutdown, * forever calling remote NameNode functions. @@ -657,6 +671,10 @@ private void offerService() throws Exception { this, resp.getNameNodeHaState()); state = resp.getNameNodeHaState().getState(); + if (state == HAServiceState.ACTIVE) { + handleRollingUpgradeStatus(resp); + } + long startProcessCommands = now(); if (!processCommand(resp.getCommands())) continue; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index 5a2a887b991..0498f1c65b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -27,11 +27,13 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; @@ -57,9 +59,19 @@ */ @InterfaceAudience.Private public class BlockPoolSliceStorage extends Storage { - private static final Pattern BLOCK_POOL_PATH_PATTERN = Pattern - .compile("^(.*)" - + "(\\/BP-[0-9]+\\-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\-[0-9]+\\/.*)$"); + static final String TRASH_ROOT_DIR = "trash"; + + private static final String BLOCK_POOL_ID_PATTERN_BASE = + "/BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+/"; + + private static final Pattern BLOCK_POOL_PATH_PATTERN = Pattern.compile( + "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(.*)$"); + + private static final Pattern BLOCK_POOL_CURRENT_PATH_PATTERN = Pattern.compile( + "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + STORAGE_DIR_CURRENT + ")(.*)$"); + + private static final Pattern BLOCK_POOL_TRASH_PATH_PATTERN = Pattern.compile( + "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + TRASH_ROOT_DIR + ")(.*)$"); private String blockpoolID = ""; // id of the blockpool @@ -92,6 +104,7 @@ private BlockPoolSliceStorage() { */ void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { + LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); // 1. For each BP data directory analyze the state and // check whether all is consistent before transitioning. this.storageDirs = new ArrayList(dataDirs.size()); @@ -231,8 +244,15 @@ protected void setFieldsFromProperties(Properties props, StorageDirectory sd) */ private void doTransition(StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { - if (startOpt == StartupOption.ROLLBACK) + if (startOpt == StartupOption.ROLLBACK) { doRollback(sd, nsInfo); // rollback if applicable + } else if (startOpt == StartupOption.ROLLINGUPGRADE && + startOpt.getRollingUpgradeStartupOption() == RollingUpgradeStartupOption.ROLLBACK) { + File trashRoot = getTrashRootDir(sd); + int filesRestored = + trashRoot.exists() ? restoreBlockFilesFromTrash(trashRoot) : 0; + LOG.info("Restored " + filesRestored + " block files from trash."); + } readProperties(sd); checkVersionUpgradable(this.layoutVersion); @@ -358,6 +378,34 @@ private void cleanupDetachDir(File detachDir) throws IOException { } } + /** + * Restore all files from the trash directory to their corresponding + * locations under current/ + * + * @param trashRoot + */ + private int restoreBlockFilesFromTrash(File trashRoot) { + int filesRestored = 0; + File restoreDirectory = null; + + for (File child : trashRoot.listFiles()) { + if (child.isDirectory()) { + // Recurse to process subdirectories. + filesRestored += restoreBlockFilesFromTrash(child); + } + + if (restoreDirectory == null) { + restoreDirectory = new File(getRestoreDirectory(child)); + restoreDirectory.mkdirs(); + } + + child.renameTo(new File(restoreDirectory, child.getName())); + ++filesRestored; + } + + return filesRestored; + } + /* * Roll back to old snapshot at the block pool level * If previous directory exists: @@ -505,4 +553,51 @@ public static File getBpRoot(String bpID, File dnCurDir) { public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException { return false; } + + private File getTrashRootDir(StorageDirectory sd) { + return new File(sd.getRoot(), TRASH_ROOT_DIR); + } + + /** + * Get a target subdirectory under trash/ for a given block file that is being + * deleted. + * + * The subdirectory structure under trash/ mirrors that under current/ to keep + * implicit memory of where the files are to be restored (if necessary). + * + * @param blockFile + * @return the trash directory for a given block file that is being deleted. + */ + public String getTrashDirectory(File blockFile) { + Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent()); + String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4"); + return trashDirectory; + } + + /** + * Get a target subdirectory under current/ for a given block file that is being + * restored from trash. + * + * The subdirectory structure under trash/ mirrors that under current/ to keep + * implicit memory of where the files are to be restored. + * + * @param blockFile + * @return the target directory to restore a previously deleted block file. + */ + @VisibleForTesting + String getRestoreDirectory(File blockFile) { + Matcher matcher = BLOCK_POOL_TRASH_PATH_PATTERN.matcher(blockFile.getParent()); + String restoreDirectory = matcher.replaceFirst("$1$2" + STORAGE_DIR_CURRENT + "$4"); + LOG.info("Restoring " + blockFile + " to " + restoreDirectory); + return restoreDirectory; + } + + /** + * Delete all files and directories in the trash directories. + */ + public void emptyTrash() { + for (StorageDirectory sd : storageDirs) { + FileUtil.fullyDelete(getTrashRootDir(sd)); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ad580a53d1d..251e616902d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.StorageInfo; @@ -210,7 +211,14 @@ public class DataNode extends Configured static final Log ClientTraceLog = LogFactory.getLog(DataNode.class.getName() + ".clienttrace"); - private static final String USAGE = "Usage: java DataNode [-rollback | -regular]"; + private static final String USAGE = + "Usage: java DataNode [-regular | -rollback | -rollingupgrade rollback]\n" + + " -regular : Normal DataNode startup (default).\n" + + " -rollback : Rollback a standard upgrade.\n" + + " -rollingupgrade rollback : Rollback a rolling upgrade operation.\n" + + " Refer to HDFS documentation for the difference between standard\n" + + " and rolling upgrades."; + static final int CURRENT_BLOCK_FORMAT_VERSION = 1; /** @@ -1754,6 +1762,7 @@ public static DataNode instantiateDataNode(String args [], Configuration conf, } if (!parseArguments(args, conf)) { + LOG.error("Bad command line arguments"); printUsage(System.err); return null; } @@ -1788,6 +1797,7 @@ public static List getStorageLocations(Configuration conf) { /** Instantiate & Start a single datanode daemon and wait for it to finish. * If this thread is specifically interrupted, it will stop waiting. */ + @VisibleForTesting public static DataNode createDataNode(String args[], Configuration conf) throws IOException { return createDataNode(args, conf, null); @@ -1796,6 +1806,7 @@ public static DataNode createDataNode(String args[], /** Instantiate & Start a single datanode daemon and wait for it to finish. * If this thread is specifically interrupted, it will stop waiting. */ + @VisibleForTesting @InterfaceAudience.Private public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { @@ -1906,25 +1917,40 @@ private static void printUsage(PrintStream out) { * * @return false if passed argements are incorrect */ - private static boolean parseArguments(String args[], - Configuration conf) { - int argsLen = (args == null) ? 0 : args.length; + @VisibleForTesting + static boolean parseArguments(String args[], Configuration conf) { StartupOption startOpt = StartupOption.REGULAR; - for(int i=0; i < argsLen; i++) { - String cmd = args[i]; + int i = 0; + + if (args != null && args.length != 0) { + String cmd = args[i++]; if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) { LOG.error("-r, --rack arguments are not supported anymore. RackID " + "resolution is handled by the NameNode."); - terminate(1); - } else if ("-rollback".equalsIgnoreCase(cmd)) { - startOpt = StartupOption.ROLLBACK; - } else if ("-regular".equalsIgnoreCase(cmd)) { - startOpt = StartupOption.REGULAR; - } else return false; + } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) { + startOpt = StartupOption.ROLLBACK; + } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) { + startOpt = StartupOption.REGULAR; + } else if (StartupOption.ROLLINGUPGRADE.getName().equalsIgnoreCase(cmd)) { + startOpt = StartupOption.ROLLINGUPGRADE; + + if ((i < args.length ) && + (args[i].equalsIgnoreCase(RollingUpgradeStartupOption.ROLLBACK.toString()))) { + startOpt.setRollingUpgradeStartupOption(args[i++]); + } else { + LOG.error("Missing or unrecognized option to " + StartupOption.ROLLINGUPGRADE); + return false; + } + + LOG.info("Rolling upgrade rollback requested via startup option"); + } else { + return false; + } } + setStartupOption(conf, startOpt); - return true; + return (args == null || i == args.length); // Fail if more than one cmd specified! } private static void setStartupOption(Configuration conf, StartupOption opt) { @@ -1932,8 +1958,9 @@ private static void setStartupOption(Configuration conf, StartupOption opt) { } static StartupOption getStartupOption(Configuration conf) { - return StartupOption.valueOf(conf.get(DFS_DATANODE_STARTUP_KEY, - StartupOption.REGULAR.toString())); + String value = conf.get(DFS_DATANODE_STARTUP_KEY, + StartupOption.REGULAR.toString()); + return StartupOption.getEnum(value); } /** @@ -1963,11 +1990,15 @@ public DataBlockScanner getBlockScanner() { public static void secureMain(String args[], SecureResources resources) { + int errorCode = 0; try { StringUtils.startupShutdownMessage(DataNode.class, args, LOG); DataNode datanode = createDataNode(args, null, resources); - if (datanode != null) + if (datanode != null) { datanode.join(); + } else { + errorCode = 1; + } } catch (Throwable e) { LOG.fatal("Exception in secureMain", e); terminate(1, e); @@ -1977,7 +2008,7 @@ public static void secureMain(String args[], SecureResources resources) { // condition was not met. Also, In secure mode, control will go to Jsvc // and Datanode process hangs if it does not exit. LOG.warn("Exiting Datanode"); - terminate(0); + terminate(errorCode); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 53bcdb8689d..51169b28fec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -25,6 +25,7 @@ import java.io.RandomAccessFile; import java.nio.channels.FileLock; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -65,6 +66,13 @@ public class DataStorage extends Storage { public final static String STORAGE_DIR_FINALIZED = "finalized"; public final static String STORAGE_DIR_TMP = "tmp"; + // Set of bpids for which 'trash' is currently enabled. + // When trash is enabled block files are moved under a separate + // 'trash' folder instead of being deleted right away. This can + // be useful during rolling upgrades, for example. + // The set is backed by a concurrent HashMap. + private Set trashEnabledBpids; + /** * Datanode UUID that this storage is currently attached to. This * is the same as the legacy StorageID for datanodes that were @@ -83,6 +91,8 @@ public class DataStorage extends Storage { DataStorage() { super(NodeType.DATA_NODE); + trashEnabledBpids = Collections.newSetFromMap( + new ConcurrentHashMap()); } public StorageInfo getBPStorage(String bpid) { @@ -107,6 +117,49 @@ public synchronized void createStorageID(StorageDirectory sd) { sd.setStorageUuid(DatanodeStorage.generateUuid()); } } + + /** + * Enable trash for the specified block pool storage. + * + * @param bpid + * @param inProgress + */ + public void enableTrash(String bpid) { + if (trashEnabledBpids.add(bpid)) { + LOG.info("Enabled trash for bpid " + bpid); + } + } + + /** + * Disable trash for the specified block pool storage. + * Existing files in trash are purged i.e. permanently deleted. + * + * @param bpid + * @param inProgress + */ + public void disableAndPurgeTrash(String bpid) { + if (trashEnabledBpids.remove(bpid)) { + LOG.info("Disabled trash for bpid " + bpid); + } + ((BlockPoolSliceStorage) getBPStorage(bpid)).emptyTrash(); + } + + /** + * If rolling upgrades are in progress then do not delete block files + * immediately. Instead we move the block files to an intermediate + * 'trash' directory. If there is a subsequent rollback, then the block + * files will be restored from trash. + * + * @param blockFile + * @return trash directory if rolling upgrade is in progress, null + * otherwise. + */ + public String getTrashDirectoryForBlockFile(String bpid, File blockFile) { + if (trashEnabledBpids.contains(bpid)) { + return ((BlockPoolSliceStorage) getBPStorage(bpid)).getTrashDirectory(blockFile); + } + return null; + } /** * Analyze storage directories. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 415c6a985ab..c0fa3478f4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -411,5 +411,17 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b */ public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) throws IOException; + + /** + * Enable 'trash' for the given dataset. When trash is enabled, files are + * moved to a separate trash directory instead of being deleted immediately. + * This can be useful for example during rolling upgrades. + */ + public void enableDeleteToTrash(String bpid); + + /** + * Disable 'trash' for the given dataset and purge existing files in 'trash'. + */ + public void disableAndPurgeTrashStorage(String bpid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 4c0e19e612e..117946c30bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -153,29 +153,34 @@ synchronized void shutdown() { * dfsUsed statistics accordingly. */ void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile, - ExtendedBlock block) { + ExtendedBlock block, String trashDirectory) { LOG.info("Scheduling " + block.getLocalBlock() + " file " + blockFile + " for deletion"); ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask( - volume, blockFile, metaFile, block); + volume, blockFile, metaFile, block, trashDirectory); execute(volume.getCurrentDir(), deletionTask); } /** A task for deleting a block file and its associated meta file, as well - * as decrement the dfs usage of the volume. + * as decrement the dfs usage of the volume. + * Optionally accepts a trash directory. If one is specified then the files + * are moved to trash instead of being deleted. If none is specified then the + * files are deleted immediately. */ class ReplicaFileDeleteTask implements Runnable { final FsVolumeImpl volume; final File blockFile; final File metaFile; final ExtendedBlock block; + final String trashDirectory; ReplicaFileDeleteTask(FsVolumeImpl volume, File blockFile, - File metaFile, ExtendedBlock block) { + File metaFile, ExtendedBlock block, String trashDirectory) { this.volume = volume; this.blockFile = blockFile; this.metaFile = metaFile; this.block = block; + this.trashDirectory = trashDirectory; } @Override @@ -186,12 +191,34 @@ public String toString() { + " and meta file " + metaFile + " from volume " + volume; } + private boolean deleteFiles() { + return blockFile.delete() && (metaFile.delete() || !metaFile.exists()); + } + + private boolean moveFiles() { + File newBlockFile = new File(trashDirectory, blockFile.getName()); + File newMetaFile = new File(trashDirectory, metaFile.getName()); + (new File(trashDirectory)).mkdirs(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Moving files " + blockFile.getName() + " and " + + metaFile.getName() + " to trash."); + } + return (blockFile.renameTo(newBlockFile) && + metaFile.renameTo(newMetaFile)); + } + @Override public void run() { long dfsBytes = blockFile.length() + metaFile.length(); - if (!blockFile.delete() || (!metaFile.delete() && metaFile.exists())) { - LOG.warn("Unexpected error trying to delete block " - + block.getBlockPoolId() + " " + block.getLocalBlock() + boolean result; + + result = (trashDirectory == null) ? deleteFiles() : moveFiles(); + + if (!result) { + LOG.warn("Unexpected error trying to " + + (trashDirectory == null ? "delete" : "move") + + " block " + block.getBlockPoolId() + " " + block.getLocalBlock() + " at file " + blockFile + ". Ignored."); } else { if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 53386bcbbf8..6913eec7ac9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -38,7 +38,6 @@ import javax.management.ObjectName; import javax.management.StandardMBean; -import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -193,6 +192,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) } final DataNode datanode; + final DataStorage dataStorage; final FsVolumeList volumes; final FsDatasetAsyncDiskService asyncDiskService; final FsDatasetCache cacheManager; @@ -209,6 +209,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.datanode = datanode; + this.dataStorage = storage; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = @@ -1234,7 +1235,8 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException { // finishes. asyncDiskService.deleteAsync(v, f, FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), - new ExtendedBlock(bpid, invalidBlks[i])); + new ExtendedBlock(bpid, invalidBlks[i]), + dataStorage.getTrashDirectoryForBlockFile(bpid, f)); } if (error) { throw new IOException("Error in deleting blocks."); @@ -1891,6 +1893,16 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) blocksVolumeIds, blocksVolumeIndexes); } + @Override + public void enableDeleteToTrash(String bpid) { + dataStorage.enableTrash(bpid); + } + + @Override + public void disableAndPurgeTrashStorage(String bpid) { + dataStorage.disableAndPurgeTrash(bpid); + } + @Override public RollingLogs createRollingLogs(String bpid, String prefix ) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 7bdd7e8f4f8..3283de554a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -391,6 +391,10 @@ public class DataNodeProperties { this.dnArgs = args; this.secureResources = secureResources; } + + public void setDnArgs(String ... args) { + dnArgs = args; + } } private Configuration conf; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index a2e95a4d673..3bf2f052fc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1040,6 +1040,16 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) throw new UnsupportedOperationException(); } + @Override + public void enableDeleteToTrash(String bpid) { + throw new UnsupportedOperationException(); + } + + @Override + public void disableAndPurgeTrashStorage(String bpid) { + throw new UnsupportedOperationException(); + } + @Override public void checkAndUpdate(String bpid, long blockId, File diskFile, File diskMetaFile, FsVolumeSpi vol) {