HDFS-8831. Trash Support for deletion in HDFS encryption zone. Contributed by Xiaoyu Yao.

(cherry picked from commit cbc7b6bf97)
This commit is contained in:
Xiaoyu Yao 2015-12-04 10:39:45 -08:00
parent 496c8969d4
commit eb7f9901b4
11 changed files with 368 additions and 94 deletions

View File

@ -105,6 +105,8 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
public static final String TRASH_PREFIX = ".Trash";
/** FileSystem cache */
static final Cache CACHE = new Cache();
@ -2649,6 +2651,53 @@ public abstract class FileSystem extends Configured implements Closeable {
+ " doesn't support getAllStoragePolicies");
}
/**
* Get the root directory of Trash for current user when the path specified
* is deleted.
*
* @param path the trash root of the path to be determined.
* @return the default implementation returns "/user/$USER/.Trash".
* @throws IOException
*/
public Path getTrashRoot(Path path) throws IOException {
return this.makeQualified(new Path(getHomeDirectory().toUri().getPath(),
TRASH_PREFIX));
}
/**
* Get all the trash roots for current user or all users.
*
* @param allUsers return trash roots for all users if true.
* @return all the trash root directories.
* Default FileSystem returns .Trash under users' home directories if
* /user/$USER/.Trash exists.
* @throws IOException
*/
public Collection<FileStatus> getTrashRoots(boolean allUsers)
throws IOException {
Path userHome = new Path(getHomeDirectory().toUri().getPath());
List<FileStatus> ret = new ArrayList<FileStatus>();
if (!allUsers) {
Path userTrash = new Path(userHome, TRASH_PREFIX);
if (exists(userTrash)) {
ret.add(getFileStatus(userTrash));
}
} else {
Path homeParent = userHome.getParent();
if (exists(homeParent)) {
FileStatus[] candidates = listStatus(homeParent);
for (FileStatus candidate : candidates) {
Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
if (exists(userTrash)) {
candidate.setPath(userTrash);
ret.add(candidate);
}
}
}
}
return ret;
}
// making it volatile to be able to do a double checked locking
private volatile static boolean FILE_SYSTEMS_LOADED = false;
@ -3169,7 +3218,7 @@ public abstract class FileSystem extends Configured implements Closeable {
* For each StatisticsData object, we will call accept on the visitor.
* Finally, at the end, we will call aggregate to get the final total.
*
* @param The visitor to use.
* @param visitor to use.
* @return The total.
*/
private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {

View File

@ -643,4 +643,15 @@ public class FilterFileSystem extends FileSystem {
throws IOException {
return fs.getAllStoragePolicies();
}
@Override
public Path getTrashRoot(Path path) throws IOException {
return fs.getTrashRoot(path);
}
@Override
public Collection<FileStatus> getTrashRoots(boolean allUsers)
throws IOException {
return fs.getTrashRoots(allUsers);
}
}

View File

@ -121,11 +121,21 @@ public class FsShell extends Configured implements Tool {
return getTrash().getCurrentTrashDir();
}
/**
* Returns the current trash location for the path specified
* @param path to be deleted
* @return path to the trash
* @throws IOException
*/
public Path getCurrentTrashDir(Path path) throws IOException {
return getTrash().getCurrentTrashDir(path);
}
// NOTE: Usage/Help are inner classes to allow access to outer methods
// that access commandFactory
/**
* Display help for commands with their short usage and long description
* Display help for commands with their short usage and long description.
*/
protected class Usage extends FsCommand {
public static final String NAME = "usage";

View File

@ -54,7 +54,7 @@ public class Trash extends Configured {
*/
public Trash(FileSystem fs, Configuration conf) throws IOException {
super(conf);
trashPolicy = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory());
trashPolicy = TrashPolicy.getInstance(conf, fs);
}
/**
@ -92,11 +92,7 @@ public class Trash extends Configured {
throw new IOException("Failed to get server trash configuration", e);
}
Trash trash = new Trash(fullyResolvedFs, conf);
boolean success = trash.moveToTrash(fullyResolvedPath);
if (success) {
LOG.info("Moved: '" + p + "' to trash at: " + trash.getCurrentTrashDir());
}
return success;
return trash.moveToTrash(fullyResolvedPath);
}
/**
@ -124,7 +120,7 @@ public class Trash extends Configured {
}
/** get the current working directory */
Path getCurrentTrashDir() {
Path getCurrentTrashDir() throws IOException {
return trashPolicy.getCurrentTrashDir();
}
@ -139,4 +135,8 @@ public class Trash extends Configured {
public Runnable getEmptier() throws IOException {
return trashPolicy.getEmptier();
}
public Path getCurrentTrashDir(Path path) throws IOException {
return trashPolicy.getCurrentTrashDir(path);
}
}

View File

@ -38,7 +38,7 @@ public abstract class TrashPolicy extends Configured {
/**
* Used to setup the trash policy. Must be implemented by all TrashPolicy
* implementations
* implementations.
* @param conf the configuration to be used
* @param fs the filesystem to be used
* @param home the home directory
@ -46,7 +46,19 @@ public abstract class TrashPolicy extends Configured {
public abstract void initialize(Configuration conf, FileSystem fs, Path home);
/**
* Returns whether the Trash Policy is enabled for this filesystem
* Used to setup the trash policy. Must be implemented by all TrashPolicy
* implementations. Different from initialize(conf, fs, home), this one does
* not assume trash always under /user/$USER due to HDFS encryption zone.
* @param conf the configuration to be used
* @param fs the filesystem to be used
* @throws IOException
*/
public void initialize(Configuration conf, FileSystem fs) throws IOException{
throw new UnsupportedOperationException();
}
/**
* Returns whether the Trash Policy is enabled for this filesystem.
*/
public abstract boolean isEnabled();
@ -68,8 +80,27 @@ public abstract class TrashPolicy extends Configured {
/**
* Get the current working directory of the Trash Policy
* This API does not work with files deleted from encryption zone when HDFS
* data encryption at rest feature is enabled as rename file between
* encryption zones or encryption zone and non-encryption zone is not allowed.
*
* The caller is recommend to use the new API
* TrashPolicy#getCurrentTrashDir(Path path).
* It returns the trash location correctly for the path specified no matter
* the path is in encryption zone or not.
*/
public abstract Path getCurrentTrashDir();
public abstract Path getCurrentTrashDir() throws IOException;
/**
* Get the current trash directory for path specified based on the Trash
* Policy
* @param path path to be deleted
* @return current trash directory for the path to be deleted
* @throws IOException
*/
public Path getCurrentTrashDir(Path path) throws IOException {
throw new UnsupportedOperationException();
}
/**
* Return a {@link Runnable} that periodically empties the trash of all
@ -78,7 +109,7 @@ public abstract class TrashPolicy extends Configured {
public abstract Runnable getEmptier() throws IOException;
/**
* Get an instance of the configured TrashPolicy based on the value
* Get an instance of the configured TrashPolicy based on the value
* of the configuration parameter fs.trash.classname.
*
* @param conf the configuration to be used
@ -93,4 +124,21 @@ public abstract class TrashPolicy extends Configured {
trash.initialize(conf, fs, home); // initialize TrashPolicy
return trash;
}
/**
* Get an instance of the configured TrashPolicy based on the value
* of the configuration parameter fs.trash.classname.
*
* @param conf the configuration to be used
* @param fs the file system to be used
* @return an instance of TrashPolicy
*/
public static TrashPolicy getInstance(Configuration conf, FileSystem fs)
throws IOException {
Class<? extends TrashPolicy> trashClass = conf.getClass(
"fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class);
TrashPolicy trash = ReflectionUtils.newInstance(trashClass, conf);
trash.initialize(conf, fs); // initialize TrashPolicy
return trash;
}
}

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import org.apache.commons.logging.Log;
@ -66,23 +67,18 @@ public class TrashPolicyDefault extends TrashPolicy {
new SimpleDateFormat("yyMMddHHmm");
private static final int MSECS_PER_MINUTE = 60*1000;
private Path current;
private Path homesParent;
private long emptierInterval;
public TrashPolicyDefault() { }
private TrashPolicyDefault(FileSystem fs, Path home, Configuration conf)
private TrashPolicyDefault(FileSystem fs, Configuration conf)
throws IOException {
initialize(conf, fs, home);
initialize(conf, fs);
}
@Override
public void initialize(Configuration conf, FileSystem fs, Path home) {
this.fs = fs;
this.trash = new Path(home, TRASH);
this.homesParent = home.getParent();
this.current = new Path(trash, CURRENT);
this.deletionInterval = (long)(conf.getFloat(
FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
@ -91,6 +87,17 @@ public class TrashPolicyDefault extends TrashPolicy {
* MSECS_PER_MINUTE);
}
@Override
public void initialize(Configuration conf, FileSystem fs) {
this.fs = fs;
this.deletionInterval = (long)(conf.getFloat(
FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
this.emptierInterval = (long)(conf.getFloat(
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
}
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
return Path.mergePaths(basePath, rmFilePath);
}
@ -113,17 +120,19 @@ public class TrashPolicyDefault extends TrashPolicy {
String qpath = fs.makeQualified(path).toString();
if (qpath.startsWith(trash.toString())) {
Path trashRoot = fs.getTrashRoot(path);
Path trashCurrent = new Path(trashRoot, CURRENT);
if (qpath.startsWith(trashRoot.toString())) {
return false; // already in trash
}
if (trash.getParent().toString().startsWith(qpath)) {
if (trashRoot.getParent().toString().startsWith(qpath)) {
throw new IOException("Cannot move \"" + path +
"\" to the trash, as it contains the trash");
}
Path trashPath = makeTrashRelativePath(current, path);
Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
Path trashPath = makeTrashRelativePath(trashCurrent, path);
Path baseTrashPath = makeTrashRelativePath(trashCurrent, path.getParent());
IOException cause = null;
@ -148,14 +157,16 @@ public class TrashPolicyDefault extends TrashPolicy {
trashPath = new Path(orig + Time.now());
}
if (fs.rename(path, trashPath)) // move to current trash
if (fs.rename(path, trashPath)) { // move to current trash
LOG.info("Moved: '" + path + "' to trash at: " + trashPath);
return true;
}
} catch (IOException e) {
cause = e;
}
}
throw (IOException)
new IOException("Failed to move to trash: "+path).initCause(cause);
new IOException("Failed to move to trash: " + path).initCause(cause);
}
@SuppressWarnings("deprecation")
@ -166,72 +177,32 @@ public class TrashPolicyDefault extends TrashPolicy {
@SuppressWarnings("deprecation")
public void createCheckpoint(Date date) throws IOException {
if (!fs.exists(current)) // no trash, no checkpoint
return;
Path checkpointBase;
synchronized (CHECKPOINT) {
checkpointBase = new Path(trash, CHECKPOINT.format(date));
Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
for (FileStatus trashRoot: trashRoots) {
LOG.info("TrashPolicyDefault#createCheckpoint for trashRoot: " +
trashRoot.getPath());
createCheckpoint(trashRoot.getPath(), date);
}
Path checkpoint = checkpointBase;
int attempt = 0;
while (true) {
try {
fs.rename(current, checkpoint, Rename.NONE);
break;
} catch (FileAlreadyExistsException e) {
if (++attempt > 1000) {
throw new IOException("Failed to checkpoint trash: "+checkpoint);
}
checkpoint = checkpointBase.suffix("-" + attempt);
}
}
LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
}
@Override
public void deleteCheckpoint() throws IOException {
FileStatus[] dirs = null;
try {
dirs = fs.listStatus(trash); // scan trash sub-directories
} catch (FileNotFoundException fnfe) {
return;
}
long now = Time.now();
for (int i = 0; i < dirs.length; i++) {
Path path = dirs[i].getPath();
String dir = path.toUri().getPath();
String name = path.getName();
if (name.equals(CURRENT.getName())) // skip current
continue;
long time;
try {
time = getTimeFromCheckpoint(name);
} catch (ParseException e) {
LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
continue;
}
if ((now - deletionInterval) > time) {
if (fs.delete(path, true)) {
LOG.info("Deleted trash checkpoint: "+dir);
} else {
LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
}
}
Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
for (FileStatus trashRoot : trashRoots) {
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
trashRoot.getPath());
deleteCheckpoint(trashRoot.getPath());
}
}
@Override
public Path getCurrentTrashDir() {
return current;
public Path getCurrentTrashDir() throws IOException {
return new Path(fs.getTrashRoot(null), CURRENT);
}
@Override
public Path getCurrentTrashDir(Path path) throws IOException {
return new Path(fs.getTrashRoot(path), CURRENT);
}
@Override
@ -278,25 +249,24 @@ public class TrashPolicyDefault extends TrashPolicy {
try {
now = Time.now();
if (now >= end) {
FileStatus[] homes = null;
Collection<FileStatus> trashRoots;
try {
homes = fs.listStatus(homesParent); // list all home dirs
trashRoots = fs.getTrashRoots(true); // list all home dirs
} catch (IOException e) {
LOG.warn("Trash can't list homes: "+e+" Sleeping.");
LOG.warn("Trash can't list all trash roots: "+e+" Sleeping.");
continue;
}
for (FileStatus home : homes) { // dump each trash
if (!home.isDirectory())
for (FileStatus trashRoot : trashRoots) { // dump each trash
if (!trashRoot.isDirectory())
continue;
try {
TrashPolicyDefault trash = new TrashPolicyDefault(
fs, home.getPath(), conf);
trash.deleteCheckpoint();
trash.createCheckpoint(new Date(now));
TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
trash.deleteCheckpoint(trashRoot.getPath());
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
} catch (IOException e) {
LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
LOG.warn("Trash caught: "+e+". Skipping " +
trashRoot.getPath() + ".");
}
}
}
@ -319,6 +289,69 @@ public class TrashPolicyDefault extends TrashPolicy {
}
}
private void createCheckpoint(Path trashRoot, Date date) throws IOException {
if (!fs.exists(new Path(trashRoot, CURRENT))) {
return;
}
Path checkpointBase;
synchronized (CHECKPOINT) {
checkpointBase = new Path(trashRoot, CHECKPOINT.format(date));
}
Path checkpoint = checkpointBase;
Path current = new Path(trashRoot, CURRENT);
int attempt = 0;
while (true) {
try {
fs.rename(current, checkpoint, Rename.NONE);
LOG.info("Created trash checkpoint: " + checkpoint.toUri().getPath());
break;
} catch (FileAlreadyExistsException e) {
if (++attempt > 1000) {
throw new IOException("Failed to checkpoint trash: " + checkpoint);
}
checkpoint = checkpointBase.suffix("-" + attempt);
}
}
}
private void deleteCheckpoint(Path trashRoot) throws IOException {
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
FileStatus[] dirs = null;
try {
dirs = fs.listStatus(trashRoot); // scan trash sub-directories
} catch (FileNotFoundException fnfe) {
return;
}
long now = Time.now();
for (int i = 0; i < dirs.length; i++) {
Path path = dirs[i].getPath();
String dir = path.toUri().getPath();
String name = path.getName();
if (name.equals(CURRENT.getName())) { // skip current
continue;
}
long time;
try {
time = getTimeFromCheckpoint(name);
} catch (ParseException e) {
LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
continue;
}
if ((now - deletionInterval) > time) {
if (fs.delete(path, true)) {
LOG.info("Deleted trash checkpoint: "+dir);
} else {
LOG.warn("Couldn't delete checkpoint: " + dir + " Ignoring.");
}
}
}
}
private long getTimeFromCheckpoint(String name) throws ParseException {
long time;

View File

@ -214,6 +214,10 @@ public class TestHarFileSystem {
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException;
public Path getTrashRoot(Path path) throws IOException;
public Collection<FileStatus> getTrashRoots(boolean allUsers) throws IOException;
}
@Test

View File

@ -695,6 +695,10 @@ public class TestTrash extends TestCase {
public void initialize(Configuration conf, FileSystem fs, Path home) {
}
@Override
public void initialize(Configuration conf, FileSystem fs) {
}
@Override
public boolean isEnabled() {
return false;
@ -718,6 +722,11 @@ public class TestTrash extends TestCase {
return null;
}
@Override
public Path getCurrentTrashDir(Path path) throws IOException {
return null;
}
@Override
public Runnable getEmptier() throws IOException {
return null;

View File

@ -2301,4 +2301,65 @@ public class DistributedFileSystem extends FileSystem {
throws IOException {
return dfs.getInotifyEventStream(lastReadTxid);
}
/**
* Get the root directory of Trash for a path in HDFS.
* 1. File in encryption zone returns /ez1/.Trash/username
* 2. File not in encryption zone returns /users/username/.Trash
* Caller appends either Current or checkpoint timestamp for trash destination
* @param path the trash root of the path to be determined.
* @return trash root
* @throws IOException
*/
@Override
public Path getTrashRoot(Path path) throws IOException {
if ((path == null) || !dfs.isHDFSEncryptionEnabled()) {
return super.getTrashRoot(path);
}
String absSrc = path.toUri().getPath();
EncryptionZone ez = dfs.getEZForPath(absSrc);
if ((ez != null) && !ez.getPath().equals(absSrc)) {
return this.makeQualified(
new Path(ez.getPath() + "/" + FileSystem.TRASH_PREFIX +
dfs.ugi.getShortUserName()));
} else {
return super.getTrashRoot(path);
}
}
/**
* Get all the trash roots of HDFS for current user or for all the users.
* 1. File deleted from non-encryption zone /user/username/.Trash
* 2. File deleted from encryption zones
* e.g., ez1 rooted at /ez1 has its trash root at /ez1/.Trash/$USER
* @allUsers return trashRoots of all users if true, used by emptier
* @return trash roots of HDFS
* @throws IOException
*/
@Override
public Collection<FileStatus> getTrashRoots(boolean allUsers) throws IOException {
List<FileStatus> ret = new ArrayList<FileStatus>();
// Get normal trash roots
ret.addAll(super.getTrashRoots(allUsers));
// Get EZ Trash roots
final RemoteIterator<EncryptionZone> it = dfs.listEncryptionZones();
while (it.hasNext()) {
Path ezTrashRoot = new Path(it.next().getPath(), FileSystem.TRASH_PREFIX);
if (allUsers) {
for (FileStatus candidate : listStatus(ezTrashRoot)) {
if (exists(candidate.getPath())) {
ret.add(candidate);
}
}
} else {
Path userTrash = new Path(ezTrashRoot, System.getProperty("user.name"));
if (exists(userTrash)) {
ret.add(getFileStatus(userTrash));
}
}
}
return ret;
}
}

View File

@ -852,6 +852,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves
without DN restart. (Xiaobing Zhou via Arpit Agarwal)
HDFS-8831. Trash Support for deletion in HDFS encryption zone. (xyao)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FileSystemTestWrapper;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
@ -96,6 +97,7 @@ import static org.mockito.Matchers.anyShort;
import static org.mockito.Mockito.withSettings;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSTestUtil.verifyFilesEqual;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
@ -1407,4 +1409,49 @@ public class TestEncryptionZones {
assertExceptionContains("Path not found: " + zoneFile, e);
}
}
@Test(timeout = 120000)
public void testEncryptionZoneWithTrash() throws Exception {
// Create the encryption zone1
final HdfsAdmin dfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
final Path zone1 = new Path("/zone1");
fs.mkdirs(zone1);
dfsAdmin.createEncryptionZone(zone1, TEST_KEY);
// Create the encrypted file in zone1
final Path encFile1 = new Path(zone1, "encFile1");
final int len = 8192;
DFSTestUtil.createFile(fs, encFile1, len, (short) 1, 0xFEED);
Configuration clientConf = new Configuration(conf);
clientConf.setLong(FS_TRASH_INTERVAL_KEY, 1);
FsShell shell = new FsShell(clientConf);
// Delete encrypted file from the shell with trash enabled
// Verify the file is moved to appropriate trash within the zone
verifyShellDeleteWithTrash(shell, encFile1);
// Delete encryption zone from the shell with trash enabled
// Verify the zone is moved to appropriate trash location in user's home dir
verifyShellDeleteWithTrash(shell, zone1);
}
private void verifyShellDeleteWithTrash(FsShell shell, Path path)
throws Exception{
try {
final Path trashFile =
new Path(shell.getCurrentTrashDir(path) + "/" + path);
String[] argv = new String[]{"-rm", "-r", path.toString()};
int res = ToolRunner.run(shell, argv);
assertEquals("rm failed", 0, res);
assertTrue("File not in trash : " + trashFile, fs.exists(trashFile));
} catch (IOException ioe) {
fail(ioe.getMessage());
} finally {
if (fs.exists(path)) {
fs.delete(path, true);
}
}
}
}