HADOOP-16140. hadoop fs expunge to add -immediate option to purge trash immediately.
Contributed by Stephen O'Donnell. Signed-off-by: Steve Loughran <stevel@apache.org>
This commit is contained in:
parent
05df151d09
commit
686c0141ef
|
@ -120,6 +120,12 @@ public class Trash extends Configured {
|
|||
trashPolicy.deleteCheckpoint();
|
||||
}
|
||||
|
||||
/** Delete all trash immediately. */
|
||||
public void expungeImmediately() throws IOException {
|
||||
trashPolicy.createCheckpoint();
|
||||
trashPolicy.deleteCheckpointsImmediately();
|
||||
}
|
||||
|
||||
/** get the current working directory */
|
||||
Path getCurrentTrashDir() throws IOException {
|
||||
return trashPolicy.getCurrentTrashDir();
|
||||
|
|
|
@ -79,6 +79,11 @@ public abstract class TrashPolicy extends Configured {
|
|||
*/
|
||||
public abstract void deleteCheckpoint() throws IOException;
|
||||
|
||||
/**
|
||||
* Delete all checkpoints immediately, ie empty trash.
|
||||
*/
|
||||
public abstract void deleteCheckpointsImmediately() throws IOException;
|
||||
|
||||
/**
|
||||
* Get the current working directory of the Trash Policy
|
||||
* This API does not work with files deleted from encryption zone when HDFS
|
||||
|
|
|
@ -213,11 +213,20 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|||
|
||||
@Override
|
||||
public void deleteCheckpoint() throws IOException {
|
||||
deleteCheckpoint(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteCheckpointsImmediately() throws IOException {
|
||||
deleteCheckpoint(true);
|
||||
}
|
||||
|
||||
private void deleteCheckpoint(boolean deleteImmediately) throws IOException {
|
||||
Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
|
||||
for (FileStatus trashRoot : trashRoots) {
|
||||
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
|
||||
trashRoot.getPath());
|
||||
deleteCheckpoint(trashRoot.getPath());
|
||||
deleteCheckpoint(trashRoot.getPath(), deleteImmediately);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -283,7 +292,7 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|||
continue;
|
||||
try {
|
||||
TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
|
||||
trash.deleteCheckpoint(trashRoot.getPath());
|
||||
trash.deleteCheckpoint(trashRoot.getPath(), false);
|
||||
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Trash caught: "+e+". Skipping " +
|
||||
|
@ -341,7 +350,8 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|||
}
|
||||
}
|
||||
|
||||
private void deleteCheckpoint(Path trashRoot) throws IOException {
|
||||
private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
|
||||
throws IOException {
|
||||
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
|
||||
|
||||
FileStatus[] dirs = null;
|
||||
|
@ -368,7 +378,7 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|||
continue;
|
||||
}
|
||||
|
||||
if ((now - deletionInterval) > time) {
|
||||
if (((now - deletionInterval) > time) || deleteImmediately) {
|
||||
if (fs.delete(path, true)) {
|
||||
LOG.info("Deleted trash checkpoint: "+dir);
|
||||
} else {
|
||||
|
|
|
@ -219,16 +219,20 @@ class Delete {
|
|||
// than the retention threshold.
|
||||
static class Expunge extends FsCommand {
|
||||
public static final String NAME = "expunge";
|
||||
public static final String USAGE = "";
|
||||
public static final String USAGE =
|
||||
"[-immediate]";
|
||||
public static final String DESCRIPTION =
|
||||
"Delete files from the trash that are older " +
|
||||
"than the retention threshold";
|
||||
|
||||
private boolean emptyImmediately = false;
|
||||
|
||||
// TODO: should probably allow path arguments for the filesystems
|
||||
@Override
|
||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||
CommandFormat cf = new CommandFormat(0, 0);
|
||||
CommandFormat cf = new CommandFormat(0, 1, "immediate");
|
||||
cf.parse(args);
|
||||
emptyImmediately = cf.getOpt("immediate");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -239,14 +243,23 @@ class Delete {
|
|||
if (null != childFileSystems) {
|
||||
for (FileSystem fs : childFileSystems) {
|
||||
Trash trash = new Trash(fs, getConf());
|
||||
if (emptyImmediately) {
|
||||
trash.expungeImmediately();
|
||||
} else {
|
||||
trash.expunge();
|
||||
trash.checkpoint();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Trash trash = new Trash(getConf());
|
||||
if (emptyImmediately) {
|
||||
trash.expungeImmediately();
|
||||
} else {
|
||||
trash.expunge();
|
||||
trash.checkpoint();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -264,7 +264,7 @@ Displays a summary of file lengths.
|
|||
expunge
|
||||
-------
|
||||
|
||||
Usage: `hadoop fs -expunge`
|
||||
Usage: `hadoop fs -expunge [-immediate]`
|
||||
|
||||
Permanently delete files in checkpoints older than the retention threshold
|
||||
from trash directory, and create new checkpoint.
|
||||
|
@ -279,6 +279,9 @@ users can configure to create and delete checkpoints periodically
|
|||
by the parameter stored as `fs.trash.checkpoint.interval` (in core-site.xml).
|
||||
This value should be smaller or equal to `fs.trash.interval`.
|
||||
|
||||
If the `-immediate` option is passed, all files in the trash for the current
|
||||
user are immediately deleted, ignoring the `fs.trash.interval` setting.
|
||||
|
||||
Refer to the
|
||||
[HDFS Architecture guide](../hadoop-hdfs/HdfsDesign.html#File_Deletes_and_Undeletes)
|
||||
for more information about trash feature of HDFS.
|
||||
|
|
|
@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -486,6 +488,41 @@ public class TestTrash {
|
|||
trashRootFs.exists(dirToKeep));
|
||||
}
|
||||
|
||||
// Verify expunge -immediate removes all checkpoints and current folder
|
||||
{
|
||||
// Setup a recent and old checkpoint and a current folder
|
||||
// to be deleted on the next expunge and one that isn't.
|
||||
long trashInterval = conf.getLong(FS_TRASH_INTERVAL_KEY,
|
||||
FS_TRASH_INTERVAL_DEFAULT);
|
||||
long now = Time.now();
|
||||
DateFormat checkpointFormat = new SimpleDateFormat("yyMMddHHmm");
|
||||
Path oldCheckpoint = new Path(trashRoot.getParent(),
|
||||
checkpointFormat.format(now - (trashInterval * 60 * 1000) - 1));
|
||||
Path recentCheckpoint = new Path(trashRoot.getParent(),
|
||||
checkpointFormat.format(now));
|
||||
Path currentFolder = new Path(trashRoot.getParent(), "Current");
|
||||
mkdir(trashRootFs, oldCheckpoint);
|
||||
mkdir(trashRootFs, recentCheckpoint);
|
||||
mkdir(trashRootFs, currentFolder);
|
||||
|
||||
// Clear out trash
|
||||
int rc = -1;
|
||||
try {
|
||||
rc = shell.run(new String[] {"-expunge", "-immediate"});
|
||||
} catch (Exception e) {
|
||||
fail("Unexpected exception running the trash shell: " +
|
||||
e.getLocalizedMessage());
|
||||
}
|
||||
assertEquals("Expunge immediate should return zero", 0, rc);
|
||||
assertFalse("Old checkpoint should be removed",
|
||||
trashRootFs.exists(oldCheckpoint));
|
||||
assertFalse("Recent checkpoint should be removed",
|
||||
trashRootFs.exists(recentCheckpoint));
|
||||
assertFalse("Current folder should be removed",
|
||||
trashRootFs.exists(currentFolder));
|
||||
assertEquals("Ensure trash folder is empty",
|
||||
trashRootFs.listStatus(trashRoot.getParent()).length, 0);
|
||||
}
|
||||
}
|
||||
|
||||
public static void trashNonDefaultFS(Configuration conf) throws IOException {
|
||||
|
@ -1000,6 +1037,10 @@ public class TestTrash {
|
|||
public void deleteCheckpoint() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteCheckpointsImmediately() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getCurrentTrashDir() {
|
||||
return null;
|
||||
|
@ -1059,6 +1100,11 @@ public class TestTrash {
|
|||
AuditableCheckpoints.delete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteCheckpointsImmediately() throws IOException {
|
||||
AuditableCheckpoints.deleteAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getCurrentTrashDir() {
|
||||
return null;
|
||||
|
@ -1115,25 +1161,32 @@ public class TestTrash {
|
|||
*/
|
||||
private static class AuditableCheckpoints {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AuditableCheckpoints.class);
|
||||
|
||||
private static AtomicInteger numOfCheckpoint =
|
||||
new AtomicInteger(0);
|
||||
|
||||
private static void add() {
|
||||
numOfCheckpoint.incrementAndGet();
|
||||
System.out.println(String
|
||||
.format("Create a checkpoint, current number of checkpoints %d",
|
||||
numOfCheckpoint.get()));
|
||||
LOG.info("Create a checkpoint, current number of checkpoints {}",
|
||||
numOfCheckpoint.get());
|
||||
}
|
||||
|
||||
private static void delete() {
|
||||
if(numOfCheckpoint.get() > 0) {
|
||||
numOfCheckpoint.decrementAndGet();
|
||||
System.out.println(String
|
||||
.format("Delete a checkpoint, current number of checkpoints %d",
|
||||
numOfCheckpoint.get()));
|
||||
LOG.info("Delete a checkpoint, current number of checkpoints {}",
|
||||
numOfCheckpoint.get());
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteAll() {
|
||||
numOfCheckpoint.set(0);
|
||||
LOG.info("Delete all checkpoints, current number of checkpoints {}",
|
||||
numOfCheckpoint.get());
|
||||
}
|
||||
|
||||
private static int get() {
|
||||
return numOfCheckpoint.get();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue