HDFS-8831. Trash Support for deletion in HDFS encryption zone. Contributed by Xiaoyu Yao.
(cherry picked from commitcbc7b6bf97
) (cherry picked from commiteb7f9901b4
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
parent
134195df55
commit
f3ee0e7857
|
@ -105,6 +105,8 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
*/
|
*/
|
||||||
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
|
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
|
||||||
|
|
||||||
|
public static final String TRASH_PREFIX = ".Trash";
|
||||||
|
|
||||||
/** FileSystem cache */
|
/** FileSystem cache */
|
||||||
static final Cache CACHE = new Cache();
|
static final Cache CACHE = new Cache();
|
||||||
|
|
||||||
|
@ -2649,6 +2651,53 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
+ " doesn't support getAllStoragePolicies");
|
+ " doesn't support getAllStoragePolicies");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the root directory of Trash for current user when the path specified
|
||||||
|
* is deleted.
|
||||||
|
*
|
||||||
|
* @param path the trash root of the path to be determined.
|
||||||
|
* @return the default implementation returns "/user/$USER/.Trash".
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Path getTrashRoot(Path path) throws IOException {
|
||||||
|
return this.makeQualified(new Path(getHomeDirectory().toUri().getPath(),
|
||||||
|
TRASH_PREFIX));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all the trash roots for current user or all users.
|
||||||
|
*
|
||||||
|
* @param allUsers return trash roots for all users if true.
|
||||||
|
* @return all the trash root directories.
|
||||||
|
* Default FileSystem returns .Trash under users' home directories if
|
||||||
|
* /user/$USER/.Trash exists.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Collection<FileStatus> getTrashRoots(boolean allUsers)
|
||||||
|
throws IOException {
|
||||||
|
Path userHome = new Path(getHomeDirectory().toUri().getPath());
|
||||||
|
List<FileStatus> ret = new ArrayList<FileStatus>();
|
||||||
|
if (!allUsers) {
|
||||||
|
Path userTrash = new Path(userHome, TRASH_PREFIX);
|
||||||
|
if (exists(userTrash)) {
|
||||||
|
ret.add(getFileStatus(userTrash));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Path homeParent = userHome.getParent();
|
||||||
|
if (exists(homeParent)) {
|
||||||
|
FileStatus[] candidates = listStatus(homeParent);
|
||||||
|
for (FileStatus candidate : candidates) {
|
||||||
|
Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
|
||||||
|
if (exists(userTrash)) {
|
||||||
|
candidate.setPath(userTrash);
|
||||||
|
ret.add(candidate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
// making it volatile to be able to do a double checked locking
|
// making it volatile to be able to do a double checked locking
|
||||||
private volatile static boolean FILE_SYSTEMS_LOADED = false;
|
private volatile static boolean FILE_SYSTEMS_LOADED = false;
|
||||||
|
|
||||||
|
@ -3169,7 +3218,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
* For each StatisticsData object, we will call accept on the visitor.
|
* For each StatisticsData object, we will call accept on the visitor.
|
||||||
* Finally, at the end, we will call aggregate to get the final total.
|
* Finally, at the end, we will call aggregate to get the final total.
|
||||||
*
|
*
|
||||||
* @param The visitor to use.
|
* @param visitor to use.
|
||||||
* @return The total.
|
* @return The total.
|
||||||
*/
|
*/
|
||||||
private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
|
private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
|
||||||
|
|
|
@ -643,4 +643,15 @@ public class FilterFileSystem extends FileSystem {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return fs.getAllStoragePolicies();
|
return fs.getAllStoragePolicies();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getTrashRoot(Path path) throws IOException {
|
||||||
|
return fs.getTrashRoot(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<FileStatus> getTrashRoots(boolean allUsers)
|
||||||
|
throws IOException {
|
||||||
|
return fs.getTrashRoots(allUsers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,11 +121,21 @@ public class FsShell extends Configured implements Tool {
|
||||||
return getTrash().getCurrentTrashDir();
|
return getTrash().getCurrentTrashDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current trash location for the path specified
|
||||||
|
* @param path to be deleted
|
||||||
|
* @return path to the trash
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Path getCurrentTrashDir(Path path) throws IOException {
|
||||||
|
return getTrash().getCurrentTrashDir(path);
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: Usage/Help are inner classes to allow access to outer methods
|
// NOTE: Usage/Help are inner classes to allow access to outer methods
|
||||||
// that access commandFactory
|
// that access commandFactory
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Display help for commands with their short usage and long description
|
* Display help for commands with their short usage and long description.
|
||||||
*/
|
*/
|
||||||
protected class Usage extends FsCommand {
|
protected class Usage extends FsCommand {
|
||||||
public static final String NAME = "usage";
|
public static final String NAME = "usage";
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class Trash extends Configured {
|
||||||
*/
|
*/
|
||||||
public Trash(FileSystem fs, Configuration conf) throws IOException {
|
public Trash(FileSystem fs, Configuration conf) throws IOException {
|
||||||
super(conf);
|
super(conf);
|
||||||
trashPolicy = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory());
|
trashPolicy = TrashPolicy.getInstance(conf, fs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -92,11 +92,7 @@ public class Trash extends Configured {
|
||||||
throw new IOException("Failed to get server trash configuration", e);
|
throw new IOException("Failed to get server trash configuration", e);
|
||||||
}
|
}
|
||||||
Trash trash = new Trash(fullyResolvedFs, conf);
|
Trash trash = new Trash(fullyResolvedFs, conf);
|
||||||
boolean success = trash.moveToTrash(fullyResolvedPath);
|
return trash.moveToTrash(fullyResolvedPath);
|
||||||
if (success) {
|
|
||||||
LOG.info("Moved: '" + p + "' to trash at: " + trash.getCurrentTrashDir());
|
|
||||||
}
|
|
||||||
return success;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -124,7 +120,7 @@ public class Trash extends Configured {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** get the current working directory */
|
/** get the current working directory */
|
||||||
Path getCurrentTrashDir() {
|
Path getCurrentTrashDir() throws IOException {
|
||||||
return trashPolicy.getCurrentTrashDir();
|
return trashPolicy.getCurrentTrashDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,4 +135,8 @@ public class Trash extends Configured {
|
||||||
public Runnable getEmptier() throws IOException {
|
public Runnable getEmptier() throws IOException {
|
||||||
return trashPolicy.getEmptier();
|
return trashPolicy.getEmptier();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Path getCurrentTrashDir(Path path) throws IOException {
|
||||||
|
return trashPolicy.getCurrentTrashDir(path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ public abstract class TrashPolicy extends Configured {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to setup the trash policy. Must be implemented by all TrashPolicy
|
* Used to setup the trash policy. Must be implemented by all TrashPolicy
|
||||||
* implementations
|
* implementations.
|
||||||
* @param conf the configuration to be used
|
* @param conf the configuration to be used
|
||||||
* @param fs the filesystem to be used
|
* @param fs the filesystem to be used
|
||||||
* @param home the home directory
|
* @param home the home directory
|
||||||
|
@ -46,7 +46,19 @@ public abstract class TrashPolicy extends Configured {
|
||||||
public abstract void initialize(Configuration conf, FileSystem fs, Path home);
|
public abstract void initialize(Configuration conf, FileSystem fs, Path home);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns whether the Trash Policy is enabled for this filesystem
|
* Used to setup the trash policy. Must be implemented by all TrashPolicy
|
||||||
|
* implementations. Different from initialize(conf, fs, home), this one does
|
||||||
|
* not assume trash always under /user/$USER due to HDFS encryption zone.
|
||||||
|
* @param conf the configuration to be used
|
||||||
|
* @param fs the filesystem to be used
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void initialize(Configuration conf, FileSystem fs) throws IOException{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether the Trash Policy is enabled for this filesystem.
|
||||||
*/
|
*/
|
||||||
public abstract boolean isEnabled();
|
public abstract boolean isEnabled();
|
||||||
|
|
||||||
|
@ -68,8 +80,27 @@ public abstract class TrashPolicy extends Configured {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the current working directory of the Trash Policy
|
* Get the current working directory of the Trash Policy
|
||||||
|
* This API does not work with files deleted from encryption zone when HDFS
|
||||||
|
* data encryption at rest feature is enabled as rename file between
|
||||||
|
* encryption zones or encryption zone and non-encryption zone is not allowed.
|
||||||
|
*
|
||||||
|
* The caller is recommend to use the new API
|
||||||
|
* TrashPolicy#getCurrentTrashDir(Path path).
|
||||||
|
* It returns the trash location correctly for the path specified no matter
|
||||||
|
* the path is in encryption zone or not.
|
||||||
*/
|
*/
|
||||||
public abstract Path getCurrentTrashDir();
|
public abstract Path getCurrentTrashDir() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current trash directory for path specified based on the Trash
|
||||||
|
* Policy
|
||||||
|
* @param path path to be deleted
|
||||||
|
* @return current trash directory for the path to be deleted
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Path getCurrentTrashDir(Path path) throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a {@link Runnable} that periodically empties the trash of all
|
* Return a {@link Runnable} that periodically empties the trash of all
|
||||||
|
@ -93,4 +124,21 @@ public abstract class TrashPolicy extends Configured {
|
||||||
trash.initialize(conf, fs, home); // initialize TrashPolicy
|
trash.initialize(conf, fs, home); // initialize TrashPolicy
|
||||||
return trash;
|
return trash;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an instance of the configured TrashPolicy based on the value
|
||||||
|
* of the configuration parameter fs.trash.classname.
|
||||||
|
*
|
||||||
|
* @param conf the configuration to be used
|
||||||
|
* @param fs the file system to be used
|
||||||
|
* @return an instance of TrashPolicy
|
||||||
|
*/
|
||||||
|
public static TrashPolicy getInstance(Configuration conf, FileSystem fs)
|
||||||
|
throws IOException {
|
||||||
|
Class<? extends TrashPolicy> trashClass = conf.getClass(
|
||||||
|
"fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class);
|
||||||
|
TrashPolicy trash = ReflectionUtils.newInstance(trashClass, conf);
|
||||||
|
trash.initialize(conf, fs); // initialize TrashPolicy
|
||||||
|
return trash;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -66,23 +67,18 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
new SimpleDateFormat("yyMMddHHmm");
|
new SimpleDateFormat("yyMMddHHmm");
|
||||||
private static final int MSECS_PER_MINUTE = 60*1000;
|
private static final int MSECS_PER_MINUTE = 60*1000;
|
||||||
|
|
||||||
private Path current;
|
|
||||||
private Path homesParent;
|
|
||||||
private long emptierInterval;
|
private long emptierInterval;
|
||||||
|
|
||||||
public TrashPolicyDefault() { }
|
public TrashPolicyDefault() { }
|
||||||
|
|
||||||
private TrashPolicyDefault(FileSystem fs, Path home, Configuration conf)
|
private TrashPolicyDefault(FileSystem fs, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
initialize(conf, fs, home);
|
initialize(conf, fs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Configuration conf, FileSystem fs, Path home) {
|
public void initialize(Configuration conf, FileSystem fs, Path home) {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.trash = new Path(home, TRASH);
|
|
||||||
this.homesParent = home.getParent();
|
|
||||||
this.current = new Path(trash, CURRENT);
|
|
||||||
this.deletionInterval = (long)(conf.getFloat(
|
this.deletionInterval = (long)(conf.getFloat(
|
||||||
FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
|
FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
|
||||||
* MSECS_PER_MINUTE);
|
* MSECS_PER_MINUTE);
|
||||||
|
@ -91,6 +87,17 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
* MSECS_PER_MINUTE);
|
* MSECS_PER_MINUTE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(Configuration conf, FileSystem fs) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.deletionInterval = (long)(conf.getFloat(
|
||||||
|
FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
|
||||||
|
* MSECS_PER_MINUTE);
|
||||||
|
this.emptierInterval = (long)(conf.getFloat(
|
||||||
|
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
|
||||||
|
* MSECS_PER_MINUTE);
|
||||||
|
}
|
||||||
|
|
||||||
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
|
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
|
||||||
return Path.mergePaths(basePath, rmFilePath);
|
return Path.mergePaths(basePath, rmFilePath);
|
||||||
}
|
}
|
||||||
|
@ -113,17 +120,19 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
|
|
||||||
String qpath = fs.makeQualified(path).toString();
|
String qpath = fs.makeQualified(path).toString();
|
||||||
|
|
||||||
if (qpath.startsWith(trash.toString())) {
|
Path trashRoot = fs.getTrashRoot(path);
|
||||||
|
Path trashCurrent = new Path(trashRoot, CURRENT);
|
||||||
|
if (qpath.startsWith(trashRoot.toString())) {
|
||||||
return false; // already in trash
|
return false; // already in trash
|
||||||
}
|
}
|
||||||
|
|
||||||
if (trash.getParent().toString().startsWith(qpath)) {
|
if (trashRoot.getParent().toString().startsWith(qpath)) {
|
||||||
throw new IOException("Cannot move \"" + path +
|
throw new IOException("Cannot move \"" + path +
|
||||||
"\" to the trash, as it contains the trash");
|
"\" to the trash, as it contains the trash");
|
||||||
}
|
}
|
||||||
|
|
||||||
Path trashPath = makeTrashRelativePath(current, path);
|
Path trashPath = makeTrashRelativePath(trashCurrent, path);
|
||||||
Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
|
Path baseTrashPath = makeTrashRelativePath(trashCurrent, path.getParent());
|
||||||
|
|
||||||
IOException cause = null;
|
IOException cause = null;
|
||||||
|
|
||||||
|
@ -148,14 +157,16 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
trashPath = new Path(orig + Time.now());
|
trashPath = new Path(orig + Time.now());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fs.rename(path, trashPath)) // move to current trash
|
if (fs.rename(path, trashPath)) { // move to current trash
|
||||||
|
LOG.info("Moved: '" + path + "' to trash at: " + trashPath);
|
||||||
return true;
|
return true;
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
cause = e;
|
cause = e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw (IOException)
|
throw (IOException)
|
||||||
new IOException("Failed to move to trash: "+path).initCause(cause);
|
new IOException("Failed to move to trash: " + path).initCause(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
|
@ -166,72 +177,32 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public void createCheckpoint(Date date) throws IOException {
|
public void createCheckpoint(Date date) throws IOException {
|
||||||
|
Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
|
||||||
if (!fs.exists(current)) // no trash, no checkpoint
|
for (FileStatus trashRoot: trashRoots) {
|
||||||
return;
|
LOG.info("TrashPolicyDefault#createCheckpoint for trashRoot: " +
|
||||||
|
trashRoot.getPath());
|
||||||
Path checkpointBase;
|
createCheckpoint(trashRoot.getPath(), date);
|
||||||
synchronized (CHECKPOINT) {
|
|
||||||
checkpointBase = new Path(trash, CHECKPOINT.format(date));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
Path checkpoint = checkpointBase;
|
|
||||||
|
|
||||||
int attempt = 0;
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
fs.rename(current, checkpoint, Rename.NONE);
|
|
||||||
break;
|
|
||||||
} catch (FileAlreadyExistsException e) {
|
|
||||||
if (++attempt > 1000) {
|
|
||||||
throw new IOException("Failed to checkpoint trash: "+checkpoint);
|
|
||||||
}
|
|
||||||
checkpoint = checkpointBase.suffix("-" + attempt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteCheckpoint() throws IOException {
|
public void deleteCheckpoint() throws IOException {
|
||||||
FileStatus[] dirs = null;
|
Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
|
||||||
|
for (FileStatus trashRoot : trashRoots) {
|
||||||
try {
|
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
|
||||||
dirs = fs.listStatus(trash); // scan trash sub-directories
|
trashRoot.getPath());
|
||||||
} catch (FileNotFoundException fnfe) {
|
deleteCheckpoint(trashRoot.getPath());
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
long now = Time.now();
|
|
||||||
for (int i = 0; i < dirs.length; i++) {
|
|
||||||
Path path = dirs[i].getPath();
|
|
||||||
String dir = path.toUri().getPath();
|
|
||||||
String name = path.getName();
|
|
||||||
if (name.equals(CURRENT.getName())) // skip current
|
|
||||||
continue;
|
|
||||||
|
|
||||||
long time;
|
|
||||||
try {
|
|
||||||
time = getTimeFromCheckpoint(name);
|
|
||||||
} catch (ParseException e) {
|
|
||||||
LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((now - deletionInterval) > time) {
|
|
||||||
if (fs.delete(path, true)) {
|
|
||||||
LOG.info("Deleted trash checkpoint: "+dir);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path getCurrentTrashDir() {
|
public Path getCurrentTrashDir() throws IOException {
|
||||||
return current;
|
return new Path(fs.getTrashRoot(null), CURRENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getCurrentTrashDir(Path path) throws IOException {
|
||||||
|
return new Path(fs.getTrashRoot(path), CURRENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -278,25 +249,24 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
try {
|
try {
|
||||||
now = Time.now();
|
now = Time.now();
|
||||||
if (now >= end) {
|
if (now >= end) {
|
||||||
|
Collection<FileStatus> trashRoots;
|
||||||
FileStatus[] homes = null;
|
|
||||||
try {
|
try {
|
||||||
homes = fs.listStatus(homesParent); // list all home dirs
|
trashRoots = fs.getTrashRoots(true); // list all home dirs
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Trash can't list homes: "+e+" Sleeping.");
|
LOG.warn("Trash can't list all trash roots: "+e+" Sleeping.");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (FileStatus home : homes) { // dump each trash
|
for (FileStatus trashRoot : trashRoots) { // dump each trash
|
||||||
if (!home.isDirectory())
|
if (!trashRoot.isDirectory())
|
||||||
continue;
|
continue;
|
||||||
try {
|
try {
|
||||||
TrashPolicyDefault trash = new TrashPolicyDefault(
|
TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
|
||||||
fs, home.getPath(), conf);
|
trash.deleteCheckpoint(trashRoot.getPath());
|
||||||
trash.deleteCheckpoint();
|
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
|
||||||
trash.createCheckpoint(new Date(now));
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
|
LOG.warn("Trash caught: "+e+". Skipping " +
|
||||||
|
trashRoot.getPath() + ".");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -319,6 +289,69 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createCheckpoint(Path trashRoot, Date date) throws IOException {
|
||||||
|
if (!fs.exists(new Path(trashRoot, CURRENT))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Path checkpointBase;
|
||||||
|
synchronized (CHECKPOINT) {
|
||||||
|
checkpointBase = new Path(trashRoot, CHECKPOINT.format(date));
|
||||||
|
}
|
||||||
|
Path checkpoint = checkpointBase;
|
||||||
|
Path current = new Path(trashRoot, CURRENT);
|
||||||
|
|
||||||
|
int attempt = 0;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
fs.rename(current, checkpoint, Rename.NONE);
|
||||||
|
LOG.info("Created trash checkpoint: " + checkpoint.toUri().getPath());
|
||||||
|
break;
|
||||||
|
} catch (FileAlreadyExistsException e) {
|
||||||
|
if (++attempt > 1000) {
|
||||||
|
throw new IOException("Failed to checkpoint trash: " + checkpoint);
|
||||||
|
}
|
||||||
|
checkpoint = checkpointBase.suffix("-" + attempt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteCheckpoint(Path trashRoot) throws IOException {
|
||||||
|
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
|
||||||
|
|
||||||
|
FileStatus[] dirs = null;
|
||||||
|
try {
|
||||||
|
dirs = fs.listStatus(trashRoot); // scan trash sub-directories
|
||||||
|
} catch (FileNotFoundException fnfe) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
long now = Time.now();
|
||||||
|
for (int i = 0; i < dirs.length; i++) {
|
||||||
|
Path path = dirs[i].getPath();
|
||||||
|
String dir = path.toUri().getPath();
|
||||||
|
String name = path.getName();
|
||||||
|
if (name.equals(CURRENT.getName())) { // skip current
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
long time;
|
||||||
|
try {
|
||||||
|
time = getTimeFromCheckpoint(name);
|
||||||
|
} catch (ParseException e) {
|
||||||
|
LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((now - deletionInterval) > time) {
|
||||||
|
if (fs.delete(path, true)) {
|
||||||
|
LOG.info("Deleted trash checkpoint: "+dir);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Couldn't delete checkpoint: " + dir + " Ignoring.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private long getTimeFromCheckpoint(String name) throws ParseException {
|
private long getTimeFromCheckpoint(String name) throws ParseException {
|
||||||
long time;
|
long time;
|
||||||
|
|
||||||
|
|
|
@ -214,6 +214,10 @@ public class TestHarFileSystem {
|
||||||
|
|
||||||
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
|
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
public Path getTrashRoot(Path path) throws IOException;
|
||||||
|
|
||||||
|
public Collection<FileStatus> getTrashRoots(boolean allUsers) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -695,6 +695,10 @@ public class TestTrash extends TestCase {
|
||||||
public void initialize(Configuration conf, FileSystem fs, Path home) {
|
public void initialize(Configuration conf, FileSystem fs, Path home) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(Configuration conf, FileSystem fs) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isEnabled() {
|
public boolean isEnabled() {
|
||||||
return false;
|
return false;
|
||||||
|
@ -718,6 +722,11 @@ public class TestTrash extends TestCase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getCurrentTrashDir(Path path) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Runnable getEmptier() throws IOException {
|
public Runnable getEmptier() throws IOException {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -2301,4 +2301,65 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return dfs.getInotifyEventStream(lastReadTxid);
|
return dfs.getInotifyEventStream(lastReadTxid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the root directory of Trash for a path in HDFS.
|
||||||
|
* 1. File in encryption zone returns /ez1/.Trash/username
|
||||||
|
* 2. File not in encryption zone returns /users/username/.Trash
|
||||||
|
* Caller appends either Current or checkpoint timestamp for trash destination
|
||||||
|
* @param path the trash root of the path to be determined.
|
||||||
|
* @return trash root
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Path getTrashRoot(Path path) throws IOException {
|
||||||
|
if ((path == null) || !dfs.isHDFSEncryptionEnabled()) {
|
||||||
|
return super.getTrashRoot(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
String absSrc = path.toUri().getPath();
|
||||||
|
EncryptionZone ez = dfs.getEZForPath(absSrc);
|
||||||
|
if ((ez != null) && !ez.getPath().equals(absSrc)) {
|
||||||
|
return this.makeQualified(
|
||||||
|
new Path(ez.getPath() + "/" + FileSystem.TRASH_PREFIX +
|
||||||
|
dfs.ugi.getShortUserName()));
|
||||||
|
} else {
|
||||||
|
return super.getTrashRoot(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all the trash roots of HDFS for current user or for all the users.
|
||||||
|
* 1. File deleted from non-encryption zone /user/username/.Trash
|
||||||
|
* 2. File deleted from encryption zones
|
||||||
|
* e.g., ez1 rooted at /ez1 has its trash root at /ez1/.Trash/$USER
|
||||||
|
* @allUsers return trashRoots of all users if true, used by emptier
|
||||||
|
* @return trash roots of HDFS
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Collection<FileStatus> getTrashRoots(boolean allUsers) throws IOException {
|
||||||
|
List<FileStatus> ret = new ArrayList<FileStatus>();
|
||||||
|
// Get normal trash roots
|
||||||
|
ret.addAll(super.getTrashRoots(allUsers));
|
||||||
|
|
||||||
|
// Get EZ Trash roots
|
||||||
|
final RemoteIterator<EncryptionZone> it = dfs.listEncryptionZones();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Path ezTrashRoot = new Path(it.next().getPath(), FileSystem.TRASH_PREFIX);
|
||||||
|
if (allUsers) {
|
||||||
|
for (FileStatus candidate : listStatus(ezTrashRoot)) {
|
||||||
|
if (exists(candidate.getPath())) {
|
||||||
|
ret.add(candidate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Path userTrash = new Path(ezTrashRoot, System.getProperty("user.name"));
|
||||||
|
if (exists(userTrash)) {
|
||||||
|
ret.add(getFileStatus(userTrash));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -835,7 +835,6 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves
|
HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves
|
||||||
without DN restart. (Xiaobing Zhou via Arpit Agarwal)
|
without DN restart. (Xiaobing Zhou via Arpit Agarwal)
|
||||||
|
|
||||||
|
|
||||||
HDFS-9527. The return type of FSNamesystem.getBlockCollection should be
|
HDFS-9527. The return type of FSNamesystem.getBlockCollection should be
|
||||||
changed to INodeFile. (szetszwo)
|
changed to INodeFile. (szetszwo)
|
||||||
|
|
||||||
|
@ -868,6 +867,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9490. MiniDFSCluster should change block generation stamp via
|
HDFS-9490. MiniDFSCluster should change block generation stamp via
|
||||||
FsDatasetTestUtils. (Tony Wu via lei)
|
FsDatasetTestUtils. (Tony Wu via lei)
|
||||||
|
|
||||||
|
HDFS-8831. Trash Support for deletion in HDFS encryption zone. (xyao)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
import org.apache.hadoop.fs.FileSystemTestWrapper;
|
import org.apache.hadoop.fs.FileSystemTestWrapper;
|
||||||
|
import org.apache.hadoop.fs.FsShell;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
@ -96,6 +97,7 @@ import static org.mockito.Matchers.anyShort;
|
||||||
import static org.mockito.Mockito.withSettings;
|
import static org.mockito.Mockito.withSettings;
|
||||||
import static org.mockito.Mockito.any;
|
import static org.mockito.Mockito.any;
|
||||||
import static org.mockito.Mockito.anyString;
|
import static org.mockito.Mockito.anyString;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSTestUtil.verifyFilesEqual;
|
import static org.apache.hadoop.hdfs.DFSTestUtil.verifyFilesEqual;
|
||||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||||
|
@ -1408,4 +1410,49 @@ public class TestEncryptionZones {
|
||||||
assertExceptionContains("Path not found: " + zoneFile, e);
|
assertExceptionContains("Path not found: " + zoneFile, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testEncryptionZoneWithTrash() throws Exception {
|
||||||
|
// Create the encryption zone1
|
||||||
|
final HdfsAdmin dfsAdmin =
|
||||||
|
new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
|
||||||
|
final Path zone1 = new Path("/zone1");
|
||||||
|
fs.mkdirs(zone1);
|
||||||
|
dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
|
||||||
|
|
||||||
|
// Create the encrypted file in zone1
|
||||||
|
final Path encFile1 = new Path(zone1, "encFile1");
|
||||||
|
final int len = 8192;
|
||||||
|
DFSTestUtil.createFile(fs, encFile1, len, (short) 1, 0xFEED);
|
||||||
|
|
||||||
|
Configuration clientConf = new Configuration(conf);
|
||||||
|
clientConf.setLong(FS_TRASH_INTERVAL_KEY, 1);
|
||||||
|
FsShell shell = new FsShell(clientConf);
|
||||||
|
|
||||||
|
// Delete encrypted file from the shell with trash enabled
|
||||||
|
// Verify the file is moved to appropriate trash within the zone
|
||||||
|
verifyShellDeleteWithTrash(shell, encFile1);
|
||||||
|
|
||||||
|
// Delete encryption zone from the shell with trash enabled
|
||||||
|
// Verify the zone is moved to appropriate trash location in user's home dir
|
||||||
|
verifyShellDeleteWithTrash(shell, zone1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyShellDeleteWithTrash(FsShell shell, Path path)
|
||||||
|
throws Exception{
|
||||||
|
try {
|
||||||
|
final Path trashFile =
|
||||||
|
new Path(shell.getCurrentTrashDir(path) + "/" + path);
|
||||||
|
String[] argv = new String[]{"-rm", "-r", path.toString()};
|
||||||
|
int res = ToolRunner.run(shell, argv);
|
||||||
|
assertEquals("rm failed", 0, res);
|
||||||
|
assertTrue("File not in trash : " + trashFile, fs.exists(trashFile));
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
fail(ioe.getMessage());
|
||||||
|
} finally {
|
||||||
|
if (fs.exists(path)) {
|
||||||
|
fs.delete(path, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue