HADOOP-16140. hadoop fs expunge to add -immediate option to purge trash immediately.

Contributed by Stephen O'Donnell.

(cherry picked from commit 686c0141ef)
Signed-off-by: Steve Loughran <stevel@apache.org>
This commit is contained in:
Stephen O'Donnell 2019-03-05 14:11:49 +00:00 committed by Steve Loughran
parent dc38fc598d
commit 3fe31b36fa
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
6 changed files with 107 additions and 17 deletions

View File

@ -120,6 +120,12 @@ public class Trash extends Configured {
trashPolicy.deleteCheckpoint();
}
/** Delete all trash immediately. */
public void expungeImmediately() throws IOException {
trashPolicy.createCheckpoint();
trashPolicy.deleteCheckpointsImmediately();
}
/** get the current working directory */
Path getCurrentTrashDir() throws IOException {
return trashPolicy.getCurrentTrashDir();

View File

@ -79,6 +79,11 @@ public abstract class TrashPolicy extends Configured {
*/
public abstract void deleteCheckpoint() throws IOException;
/**
* Delete all checkpoints immediately, ie empty trash.
*/
public abstract void deleteCheckpointsImmediately() throws IOException;
/**
* Get the current working directory of the Trash Policy
* This API does not work with files deleted from encryption zone when HDFS

View File

@ -213,11 +213,20 @@ public class TrashPolicyDefault extends TrashPolicy {
@Override
public void deleteCheckpoint() throws IOException {
deleteCheckpoint(false);
}
@Override
public void deleteCheckpointsImmediately() throws IOException {
deleteCheckpoint(true);
}
private void deleteCheckpoint(boolean deleteImmediately) throws IOException {
Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
for (FileStatus trashRoot : trashRoots) {
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
trashRoot.getPath());
deleteCheckpoint(trashRoot.getPath());
deleteCheckpoint(trashRoot.getPath(), deleteImmediately);
}
}
@ -283,7 +292,7 @@ public class TrashPolicyDefault extends TrashPolicy {
continue;
try {
TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
trash.deleteCheckpoint(trashRoot.getPath());
trash.deleteCheckpoint(trashRoot.getPath(), false);
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
} catch (IOException e) {
LOG.warn("Trash caught: "+e+". Skipping " +
@ -341,7 +350,8 @@ public class TrashPolicyDefault extends TrashPolicy {
}
}
private void deleteCheckpoint(Path trashRoot) throws IOException {
private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
throws IOException {
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
FileStatus[] dirs = null;
@ -368,7 +378,7 @@ public class TrashPolicyDefault extends TrashPolicy {
continue;
}
if ((now - deletionInterval) > time) {
if (((now - deletionInterval) > time) || deleteImmediately) {
if (fs.delete(path, true)) {
LOG.info("Deleted trash checkpoint: "+dir);
} else {

View File

@ -219,16 +219,20 @@ class Delete {
// than the retention threshold.
static class Expunge extends FsCommand {
public static final String NAME = "expunge";
public static final String USAGE = "";
public static final String USAGE =
"[-immediate]";
public static final String DESCRIPTION =
"Delete files from the trash that are older " +
"than the retention threshold";
private boolean emptyImmediately = false;
// TODO: should probably allow path arguments for the filesystems
@Override
protected void processOptions(LinkedList<String> args) throws IOException {
CommandFormat cf = new CommandFormat(0, 0);
CommandFormat cf = new CommandFormat(0, 1, "immediate");
cf.parse(args);
emptyImmediately = cf.getOpt("immediate");
}
@Override
@ -239,14 +243,23 @@ class Delete {
if (null != childFileSystems) {
for (FileSystem fs : childFileSystems) {
Trash trash = new Trash(fs, getConf());
if (emptyImmediately) {
trash.expungeImmediately();
} else {
trash.expunge();
trash.checkpoint();
}
}
} else {
Trash trash = new Trash(getConf());
if (emptyImmediately) {
trash.expungeImmediately();
} else {
trash.expunge();
trash.checkpoint();
}
}
}
}
}

View File

@ -264,7 +264,7 @@ Displays a summary of file lengths.
expunge
-------
Usage: `hadoop fs -expunge`
Usage: `hadoop fs -expunge [-immediate]`
Permanently delete files in checkpoints older than the retention threshold
from trash directory, and create new checkpoint.
@ -279,6 +279,9 @@ users can configure to create and delete checkpoints periodically
by the parameter stored as `fs.trash.checkpoint.interval` (in core-site.xml).
This value should be smaller or equal to `fs.trash.interval`.
If the `-immediate` option is passed, all files in the trash for the current
user are immediately deleted, ignoring the `fs.trash.interval` setting.
Refer to the
[HDFS Architecture guide](../hadoop-hdfs/HdfsDesign.html#File_Deletes_and_Undeletes)
for more information about trash feature of HDFS.

View File

@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
@ -486,6 +488,41 @@ public class TestTrash {
trashRootFs.exists(dirToKeep));
}
// Verify expunge -immediate removes all checkpoints and current folder
{
// Setup a recent and old checkpoint and a current folder
// to be deleted on the next expunge and one that isn't.
long trashInterval = conf.getLong(FS_TRASH_INTERVAL_KEY,
FS_TRASH_INTERVAL_DEFAULT);
long now = Time.now();
DateFormat checkpointFormat = new SimpleDateFormat("yyMMddHHmm");
Path oldCheckpoint = new Path(trashRoot.getParent(),
checkpointFormat.format(now - (trashInterval * 60 * 1000) - 1));
Path recentCheckpoint = new Path(trashRoot.getParent(),
checkpointFormat.format(now));
Path currentFolder = new Path(trashRoot.getParent(), "Current");
mkdir(trashRootFs, oldCheckpoint);
mkdir(trashRootFs, recentCheckpoint);
mkdir(trashRootFs, currentFolder);
// Clear out trash
int rc = -1;
try {
rc = shell.run(new String[] {"-expunge", "-immediate"});
} catch (Exception e) {
fail("Unexpected exception running the trash shell: " +
e.getLocalizedMessage());
}
assertEquals("Expunge immediate should return zero", 0, rc);
assertFalse("Old checkpoint should be removed",
trashRootFs.exists(oldCheckpoint));
assertFalse("Recent checkpoint should be removed",
trashRootFs.exists(recentCheckpoint));
assertFalse("Current folder should be removed",
trashRootFs.exists(currentFolder));
assertEquals("Ensure trash folder is empty",
trashRootFs.listStatus(trashRoot.getParent()).length, 0);
}
}
public static void trashNonDefaultFS(Configuration conf) throws IOException {
@ -1000,6 +1037,10 @@ public class TestTrash {
public void deleteCheckpoint() throws IOException {
}
@Override
public void deleteCheckpointsImmediately() throws IOException {
}
@Override
public Path getCurrentTrashDir() {
return null;
@ -1059,6 +1100,11 @@ public class TestTrash {
AuditableCheckpoints.delete();
}
@Override
public void deleteCheckpointsImmediately() throws IOException {
AuditableCheckpoints.deleteAll();
}
@Override
public Path getCurrentTrashDir() {
return null;
@ -1115,25 +1161,32 @@ public class TestTrash {
*/
private static class AuditableCheckpoints {
private static final Logger LOG =
LoggerFactory.getLogger(AuditableCheckpoints.class);
private static AtomicInteger numOfCheckpoint =
new AtomicInteger(0);
private static void add() {
numOfCheckpoint.incrementAndGet();
System.out.println(String
.format("Create a checkpoint, current number of checkpoints %d",
numOfCheckpoint.get()));
LOG.info("Create a checkpoint, current number of checkpoints {}",
numOfCheckpoint.get());
}
private static void delete() {
if(numOfCheckpoint.get() > 0) {
numOfCheckpoint.decrementAndGet();
System.out.println(String
.format("Delete a checkpoint, current number of checkpoints %d",
numOfCheckpoint.get()));
LOG.info("Delete a checkpoint, current number of checkpoints {}",
numOfCheckpoint.get());
}
}
private static void deleteAll() {
numOfCheckpoint.set(0);
LOG.info("Delete all checkpoints, current number of checkpoints {}",
numOfCheckpoint.get());
}
private static int get() {
return numOfCheckpoint.get();
}