HDFS-5907. BlockPoolSliceStorage trash to handle block deletions during rolling upgrade. (Arpit Agarwal)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1568346 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-02-14 16:37:42 +00:00
parent a4f152db96
commit 5df82fa01d
12 changed files with 328 additions and 33 deletions

View File

@ -32,3 +32,7 @@ HDFS-5535 subtasks:
HDFS-5889. When starting rolling upgrade, create a fs image for rollback HDFS-5889. When starting rolling upgrade, create a fs image for rollback
so that the standby namenode can create checkpoints during upgrade. so that the standby namenode can create checkpoints during upgrade.
(szetszwo & jing9) (szetszwo & jing9)
HDFS-5535. Add BlockPoolSliceStorage 'trash' to handle block deletions
during rolling upgrades. (Arpit Agarwal)

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.common;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext; import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext;
@ -82,6 +84,9 @@ public final class HdfsServerConstants {
NONINTERACTIVE("-nonInteractive"), NONINTERACTIVE("-nonInteractive"),
RENAMERESERVED("-renameReserved"); RENAMERESERVED("-renameReserved");
private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
"(\\w+)\\((\\w+)\\)");
private final String name; private final String name;
// Used only with format and upgrade options // Used only with format and upgrade options
@ -167,6 +172,17 @@ public final class HdfsServerConstants {
} }
return super.toString(); return super.toString();
} }
static public StartupOption getEnum(String value) {
Matcher matcher = ENUM_WITH_ROLLING_UPGRADE_OPTION.matcher(value);
if (matcher.matches()) {
StartupOption option = StartupOption.valueOf(matcher.group(1));
option.setRollingUpgradeStartupOption(matcher.group(2));
return option;
} else {
return StartupOption.valueOf(value);
}
}
} }
// Timeouts for communicating with DataNode for streaming writes/reads // Timeouts for communicating with DataNode for streaming writes/reads

View File

@ -413,6 +413,18 @@ class BPOfferService {
return Lists.newArrayList(bpServices); return Lists.newArrayList(bpServices);
} }
/**
* Signal the current rolling upgrade status as indicated by the NN.
* @param inProgress true if a rolling upgrade is in progress
*/
void signalRollingUpgrade(boolean inProgress) {
if (inProgress) {
dn.getFSDataset().enableDeleteToTrash(getBlockPoolId());
} else {
dn.getFSDataset().disableAndPurgeTrashStorage(getBlockPoolId());
}
}
/** /**
* Update the BPOS's view of which NN is active, based on a heartbeat * Update the BPOS's view of which NN is active, based on a heartbeat
* response from one of the actors. * response from one of the actors.
@ -612,6 +624,7 @@ class BPOfferService {
throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN"); throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
case DatanodeProtocol.DNA_FINALIZE: case DatanodeProtocol.DNA_FINALIZE:
String bp = ((FinalizeCommand) cmd).getBlockPoolId(); String bp = ((FinalizeCommand) cmd).getBlockPoolId();
LOG.info("Got finalize command for block pool " + bp);
assert getBlockPoolId().equals(bp) : assert getBlockPoolId().equals(bp) :
"BP " + getBlockPoolId() + " received DNA_FINALIZE " + "BP " + getBlockPoolId() + " received DNA_FINALIZE " +
"for other block pool " + bp; "for other block pool " + bp;

View File

@ -33,8 +33,8 @@ import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@ -611,6 +611,20 @@ class BPServiceActor implements Runnable {
bpos.shutdownActor(this); bpos.shutdownActor(this);
} }
private void handleRollingUpgradeStatus(HeartbeatResponse resp) {
RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus();
if (rollingUpgradeStatus != null &&
rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) {
// Can this ever occur?
LOG.error("Invalid BlockPoolId " +
rollingUpgradeStatus.getBlockPoolId() +
" in HeartbeatResponse. Expected " +
bpos.getBlockPoolId());
} else {
bpos.signalRollingUpgrade(rollingUpgradeStatus != null);
}
}
/** /**
* Main loop for each BP thread. Run until shutdown, * Main loop for each BP thread. Run until shutdown,
* forever calling remote NameNode functions. * forever calling remote NameNode functions.
@ -657,6 +671,10 @@ class BPServiceActor implements Runnable {
this, resp.getNameNodeHaState()); this, resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState(); state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands = now(); long startProcessCommands = now();
if (!processCommand(resp.getCommands())) if (!processCommand(resp.getCommands()))
continue; continue;

View File

@ -27,11 +27,13 @@ import java.util.Properties;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
@ -57,9 +59,19 @@ import org.apache.hadoop.util.Daemon;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class BlockPoolSliceStorage extends Storage { public class BlockPoolSliceStorage extends Storage {
private static final Pattern BLOCK_POOL_PATH_PATTERN = Pattern static final String TRASH_ROOT_DIR = "trash";
.compile("^(.*)"
+ "(\\/BP-[0-9]+\\-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\-[0-9]+\\/.*)$"); private static final String BLOCK_POOL_ID_PATTERN_BASE =
"/BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+/";
private static final Pattern BLOCK_POOL_PATH_PATTERN = Pattern.compile(
"^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(.*)$");
private static final Pattern BLOCK_POOL_CURRENT_PATH_PATTERN = Pattern.compile(
"^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + STORAGE_DIR_CURRENT + ")(.*)$");
private static final Pattern BLOCK_POOL_TRASH_PATH_PATTERN = Pattern.compile(
"^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + TRASH_ROOT_DIR + ")(.*)$");
private String blockpoolID = ""; // id of the blockpool private String blockpoolID = ""; // id of the blockpool
@ -92,6 +104,7 @@ public class BlockPoolSliceStorage extends Storage {
*/ */
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException { Collection<File> dataDirs, StartupOption startOpt) throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
// 1. For each BP data directory analyze the state and // 1. For each BP data directory analyze the state and
// check whether all is consistent before transitioning. // check whether all is consistent before transitioning.
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
@ -231,8 +244,15 @@ public class BlockPoolSliceStorage extends Storage {
*/ */
private void doTransition(StorageDirectory sd, private void doTransition(StorageDirectory sd,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK) if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable doRollback(sd, nsInfo); // rollback if applicable
} else if (startOpt == StartupOption.ROLLINGUPGRADE &&
startOpt.getRollingUpgradeStartupOption() == RollingUpgradeStartupOption.ROLLBACK) {
File trashRoot = getTrashRootDir(sd);
int filesRestored =
trashRoot.exists() ? restoreBlockFilesFromTrash(trashRoot) : 0;
LOG.info("Restored " + filesRestored + " block files from trash.");
}
readProperties(sd); readProperties(sd);
checkVersionUpgradable(this.layoutVersion); checkVersionUpgradable(this.layoutVersion);
@ -358,6 +378,34 @@ public class BlockPoolSliceStorage extends Storage {
} }
} }
/**
* Restore all files from the trash directory to their corresponding
* locations under current/
*
* @param trashRoot
*/
private int restoreBlockFilesFromTrash(File trashRoot) {
int filesRestored = 0;
File restoreDirectory = null;
for (File child : trashRoot.listFiles()) {
if (child.isDirectory()) {
// Recurse to process subdirectories.
filesRestored += restoreBlockFilesFromTrash(child);
}
if (restoreDirectory == null) {
restoreDirectory = new File(getRestoreDirectory(child));
restoreDirectory.mkdirs();
}
child.renameTo(new File(restoreDirectory, child.getName()));
++filesRestored;
}
return filesRestored;
}
/* /*
* Roll back to old snapshot at the block pool level * Roll back to old snapshot at the block pool level
* If previous directory exists: * If previous directory exists:
@ -505,4 +553,51 @@ public class BlockPoolSliceStorage extends Storage {
public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException { public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
return false; return false;
} }
private File getTrashRootDir(StorageDirectory sd) {
return new File(sd.getRoot(), TRASH_ROOT_DIR);
}
/**
* Get a target subdirectory under trash/ for a given block file that is being
* deleted.
*
* The subdirectory structure under trash/ mirrors that under current/ to keep
* implicit memory of where the files are to be restored (if necessary).
*
* @param blockFile
* @return the trash directory for a given block file that is being deleted.
*/
public String getTrashDirectory(File blockFile) {
Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");
return trashDirectory;
}
/**
* Get a target subdirectory under current/ for a given block file that is being
* restored from trash.
*
* The subdirectory structure under trash/ mirrors that under current/ to keep
* implicit memory of where the files are to be restored.
*
* @param blockFile
* @return the target directory to restore a previously deleted block file.
*/
@VisibleForTesting
String getRestoreDirectory(File blockFile) {
Matcher matcher = BLOCK_POOL_TRASH_PATH_PATTERN.matcher(blockFile.getParent());
String restoreDirectory = matcher.replaceFirst("$1$2" + STORAGE_DIR_CURRENT + "$4");
LOG.info("Restoring " + blockFile + " to " + restoreDirectory);
return restoreDirectory;
}
/**
* Delete all files and directories in the trash directories.
*/
public void emptyTrash() {
for (StorageDirectory sd : storageDirs) {
FileUtil.fullyDelete(getTrashRootDir(sd));
}
}
} }

View File

@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
@ -210,7 +211,14 @@ public class DataNode extends Configured
static final Log ClientTraceLog = static final Log ClientTraceLog =
LogFactory.getLog(DataNode.class.getName() + ".clienttrace"); LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
private static final String USAGE = "Usage: java DataNode [-rollback | -regular]"; private static final String USAGE =
"Usage: java DataNode [-regular | -rollback | -rollingupgrade rollback]\n" +
" -regular : Normal DataNode startup (default).\n" +
" -rollback : Rollback a standard upgrade.\n" +
" -rollingupgrade rollback : Rollback a rolling upgrade operation.\n" +
" Refer to HDFS documentation for the difference between standard\n" +
" and rolling upgrades.";
static final int CURRENT_BLOCK_FORMAT_VERSION = 1; static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
/** /**
@ -1754,6 +1762,7 @@ public class DataNode extends Configured
} }
if (!parseArguments(args, conf)) { if (!parseArguments(args, conf)) {
LOG.error("Bad command line arguments");
printUsage(System.err); printUsage(System.err);
return null; return null;
} }
@ -1788,6 +1797,7 @@ public class DataNode extends Configured
/** Instantiate & Start a single datanode daemon and wait for it to finish. /** Instantiate & Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting. * If this thread is specifically interrupted, it will stop waiting.
*/ */
@VisibleForTesting
public static DataNode createDataNode(String args[], public static DataNode createDataNode(String args[],
Configuration conf) throws IOException { Configuration conf) throws IOException {
return createDataNode(args, conf, null); return createDataNode(args, conf, null);
@ -1796,6 +1806,7 @@ public class DataNode extends Configured
/** Instantiate & Start a single datanode daemon and wait for it to finish. /** Instantiate & Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting. * If this thread is specifically interrupted, it will stop waiting.
*/ */
@VisibleForTesting
@InterfaceAudience.Private @InterfaceAudience.Private
public static DataNode createDataNode(String args[], Configuration conf, public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException { SecureResources resources) throws IOException {
@ -1906,25 +1917,40 @@ public class DataNode extends Configured
* *
* @return false if passed argements are incorrect * @return false if passed argements are incorrect
*/ */
private static boolean parseArguments(String args[], @VisibleForTesting
Configuration conf) { static boolean parseArguments(String args[], Configuration conf) {
int argsLen = (args == null) ? 0 : args.length;
StartupOption startOpt = StartupOption.REGULAR; StartupOption startOpt = StartupOption.REGULAR;
for(int i=0; i < argsLen; i++) { int i = 0;
String cmd = args[i];
if (args != null && args.length != 0) {
String cmd = args[i++];
if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) { if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
LOG.error("-r, --rack arguments are not supported anymore. RackID " + LOG.error("-r, --rack arguments are not supported anymore. RackID " +
"resolution is handled by the NameNode."); "resolution is handled by the NameNode.");
terminate(1); return false;
} else if ("-rollback".equalsIgnoreCase(cmd)) { } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.ROLLBACK; startOpt = StartupOption.ROLLBACK;
} else if ("-regular".equalsIgnoreCase(cmd)) { } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.REGULAR; startOpt = StartupOption.REGULAR;
} else } else if (StartupOption.ROLLINGUPGRADE.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.ROLLINGUPGRADE;
if ((i < args.length ) &&
(args[i].equalsIgnoreCase(RollingUpgradeStartupOption.ROLLBACK.toString()))) {
startOpt.setRollingUpgradeStartupOption(args[i++]);
} else {
LOG.error("Missing or unrecognized option to " + StartupOption.ROLLINGUPGRADE);
return false; return false;
} }
LOG.info("Rolling upgrade rollback requested via startup option");
} else {
return false;
}
}
setStartupOption(conf, startOpt); setStartupOption(conf, startOpt);
return true; return (args == null || i == args.length); // Fail if more than one cmd specified!
} }
private static void setStartupOption(Configuration conf, StartupOption opt) { private static void setStartupOption(Configuration conf, StartupOption opt) {
@ -1932,8 +1958,9 @@ public class DataNode extends Configured
} }
static StartupOption getStartupOption(Configuration conf) { static StartupOption getStartupOption(Configuration conf) {
return StartupOption.valueOf(conf.get(DFS_DATANODE_STARTUP_KEY, String value = conf.get(DFS_DATANODE_STARTUP_KEY,
StartupOption.REGULAR.toString())); StartupOption.REGULAR.toString());
return StartupOption.getEnum(value);
} }
/** /**
@ -1963,11 +1990,15 @@ public class DataNode extends Configured
public static void secureMain(String args[], SecureResources resources) { public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0;
try { try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG); StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
DataNode datanode = createDataNode(args, null, resources); DataNode datanode = createDataNode(args, null, resources);
if (datanode != null) if (datanode != null) {
datanode.join(); datanode.join();
} else {
errorCode = 1;
}
} catch (Throwable e) { } catch (Throwable e) {
LOG.fatal("Exception in secureMain", e); LOG.fatal("Exception in secureMain", e);
terminate(1, e); terminate(1, e);
@ -1977,7 +2008,7 @@ public class DataNode extends Configured
// condition was not met. Also, In secure mode, control will go to Jsvc // condition was not met. Also, In secure mode, control will go to Jsvc
// and Datanode process hangs if it does not exit. // and Datanode process hangs if it does not exit.
LOG.warn("Exiting Datanode"); LOG.warn("Exiting Datanode");
terminate(0); terminate(errorCode);
} }
} }

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -65,6 +66,13 @@ public class DataStorage extends Storage {
public final static String STORAGE_DIR_FINALIZED = "finalized"; public final static String STORAGE_DIR_FINALIZED = "finalized";
public final static String STORAGE_DIR_TMP = "tmp"; public final static String STORAGE_DIR_TMP = "tmp";
// Set of bpids for which 'trash' is currently enabled.
// When trash is enabled block files are moved under a separate
// 'trash' folder instead of being deleted right away. This can
// be useful during rolling upgrades, for example.
// The set is backed by a concurrent HashMap.
private Set<String> trashEnabledBpids;
/** /**
* Datanode UUID that this storage is currently attached to. This * Datanode UUID that this storage is currently attached to. This
* is the same as the legacy StorageID for datanodes that were * is the same as the legacy StorageID for datanodes that were
@ -83,6 +91,8 @@ public class DataStorage extends Storage {
DataStorage() { DataStorage() {
super(NodeType.DATA_NODE); super(NodeType.DATA_NODE);
trashEnabledBpids = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
} }
public StorageInfo getBPStorage(String bpid) { public StorageInfo getBPStorage(String bpid) {
@ -108,6 +118,49 @@ public class DataStorage extends Storage {
} }
} }
/**
* Enable trash for the specified block pool storage.
*
* @param bpid
* @param inProgress
*/
public void enableTrash(String bpid) {
if (trashEnabledBpids.add(bpid)) {
LOG.info("Enabled trash for bpid " + bpid);
}
}
/**
* Disable trash for the specified block pool storage.
* Existing files in trash are purged i.e. permanently deleted.
*
* @param bpid
* @param inProgress
*/
public void disableAndPurgeTrash(String bpid) {
if (trashEnabledBpids.remove(bpid)) {
LOG.info("Disabled trash for bpid " + bpid);
}
((BlockPoolSliceStorage) getBPStorage(bpid)).emptyTrash();
}
/**
* If rolling upgrades are in progress then do not delete block files
* immediately. Instead we move the block files to an intermediate
* 'trash' directory. If there is a subsequent rollback, then the block
* files will be restored from trash.
*
* @param blockFile
* @return trash directory if rolling upgrade is in progress, null
* otherwise.
*/
public String getTrashDirectoryForBlockFile(String bpid, File blockFile) {
if (trashEnabledBpids.contains(bpid)) {
return ((BlockPoolSliceStorage) getBPStorage(bpid)).getTrashDirectory(blockFile);
}
return null;
}
/** /**
* Analyze storage directories. * Analyze storage directories.
* Recover from previous transitions if required. * Recover from previous transitions if required.

View File

@ -411,5 +411,17 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/ */
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException; throws IOException;
/**
* Enable 'trash' for the given dataset. When trash is enabled, files are
* moved to a separate trash directory instead of being deleted immediately.
* This can be useful for example during rolling upgrades.
*/
public void enableDeleteToTrash(String bpid);
/**
* Disable 'trash' for the given dataset and purge existing files in 'trash'.
*/
public void disableAndPurgeTrashStorage(String bpid);
} }

View File

@ -153,29 +153,34 @@ class FsDatasetAsyncDiskService {
* dfsUsed statistics accordingly. * dfsUsed statistics accordingly.
*/ */
void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile, void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile,
ExtendedBlock block) { ExtendedBlock block, String trashDirectory) {
LOG.info("Scheduling " + block.getLocalBlock() LOG.info("Scheduling " + block.getLocalBlock()
+ " file " + blockFile + " for deletion"); + " file " + blockFile + " for deletion");
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask( ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
volume, blockFile, metaFile, block); volume, blockFile, metaFile, block, trashDirectory);
execute(volume.getCurrentDir(), deletionTask); execute(volume.getCurrentDir(), deletionTask);
} }
/** A task for deleting a block file and its associated meta file, as well /** A task for deleting a block file and its associated meta file, as well
* as decrement the dfs usage of the volume. * as decrement the dfs usage of the volume.
* Optionally accepts a trash directory. If one is specified then the files
* are moved to trash instead of being deleted. If none is specified then the
* files are deleted immediately.
*/ */
class ReplicaFileDeleteTask implements Runnable { class ReplicaFileDeleteTask implements Runnable {
final FsVolumeImpl volume; final FsVolumeImpl volume;
final File blockFile; final File blockFile;
final File metaFile; final File metaFile;
final ExtendedBlock block; final ExtendedBlock block;
final String trashDirectory;
ReplicaFileDeleteTask(FsVolumeImpl volume, File blockFile, ReplicaFileDeleteTask(FsVolumeImpl volume, File blockFile,
File metaFile, ExtendedBlock block) { File metaFile, ExtendedBlock block, String trashDirectory) {
this.volume = volume; this.volume = volume;
this.blockFile = blockFile; this.blockFile = blockFile;
this.metaFile = metaFile; this.metaFile = metaFile;
this.block = block; this.block = block;
this.trashDirectory = trashDirectory;
} }
@Override @Override
@ -186,12 +191,34 @@ class FsDatasetAsyncDiskService {
+ " and meta file " + metaFile + " from volume " + volume; + " and meta file " + metaFile + " from volume " + volume;
} }
private boolean deleteFiles() {
return blockFile.delete() && (metaFile.delete() || !metaFile.exists());
}
private boolean moveFiles() {
File newBlockFile = new File(trashDirectory, blockFile.getName());
File newMetaFile = new File(trashDirectory, metaFile.getName());
(new File(trashDirectory)).mkdirs();
if (LOG.isDebugEnabled()) {
LOG.debug("Moving files " + blockFile.getName() + " and " +
metaFile.getName() + " to trash.");
}
return (blockFile.renameTo(newBlockFile) &&
metaFile.renameTo(newMetaFile));
}
@Override @Override
public void run() { public void run() {
long dfsBytes = blockFile.length() + metaFile.length(); long dfsBytes = blockFile.length() + metaFile.length();
if (!blockFile.delete() || (!metaFile.delete() && metaFile.exists())) { boolean result;
LOG.warn("Unexpected error trying to delete block "
+ block.getBlockPoolId() + " " + block.getLocalBlock() result = (trashDirectory == null) ? deleteFiles() : moveFiles();
if (!result) {
LOG.warn("Unexpected error trying to "
+ (trashDirectory == null ? "delete" : "move")
+ " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
+ " at file " + blockFile + ". Ignored."); + " at file " + blockFile + ". Ignored.");
} else { } else {
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){

View File

@ -38,7 +38,6 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.StandardMBean; import javax.management.StandardMBean;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -193,6 +192,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
final DataNode datanode; final DataNode datanode;
final DataStorage dataStorage;
final FsVolumeList volumes; final FsVolumeList volumes;
final FsDatasetAsyncDiskService asyncDiskService; final FsDatasetAsyncDiskService asyncDiskService;
final FsDatasetCache cacheManager; final FsDatasetCache cacheManager;
@ -209,6 +209,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException { ) throws IOException {
this.datanode = datanode; this.datanode = datanode;
this.dataStorage = storage;
// The number of volumes required for operation is the total number // The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate. // of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated = final int volFailuresTolerated =
@ -1234,7 +1235,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// finishes. // finishes.
asyncDiskService.deleteAsync(v, f, asyncDiskService.deleteAsync(v, f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
new ExtendedBlock(bpid, invalidBlks[i])); new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForBlockFile(bpid, f));
} }
if (error) { if (error) {
throw new IOException("Error in deleting blocks."); throw new IOException("Error in deleting blocks.");
@ -1891,6 +1893,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
blocksVolumeIds, blocksVolumeIndexes); blocksVolumeIds, blocksVolumeIndexes);
} }
@Override
public void enableDeleteToTrash(String bpid) {
dataStorage.enableTrash(bpid);
}
@Override
public void disableAndPurgeTrashStorage(String bpid) {
dataStorage.disableAndPurgeTrash(bpid);
}
@Override @Override
public RollingLogs createRollingLogs(String bpid, String prefix public RollingLogs createRollingLogs(String bpid, String prefix
) throws IOException { ) throws IOException {

View File

@ -391,6 +391,10 @@ public class MiniDFSCluster {
this.dnArgs = args; this.dnArgs = args;
this.secureResources = secureResources; this.secureResources = secureResources;
} }
public void setDnArgs(String ... args) {
dnArgs = args;
}
} }
private Configuration conf; private Configuration conf;

View File

@ -1040,6 +1040,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void enableDeleteToTrash(String bpid) {
throw new UnsupportedOperationException();
}
@Override
public void disableAndPurgeTrashStorage(String bpid) {
throw new UnsupportedOperationException();
}
@Override @Override
public void checkAndUpdate(String bpid, long blockId, File diskFile, public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol) { File diskMetaFile, FsVolumeSpi vol) {