HADOOP-16140. hadoop fs expunge to add -immediate option to purge trash immediately.
Contributed by Stephen O'Donnell.
Signed-off-by: Steve Loughran <stevel@apache.org>
(cherry picked from commit 686c0141ef
)
This commit is contained in:
parent
528dc8199b
commit
9dfb26136b
|
@ -120,6 +120,12 @@ public class Trash extends Configured {
|
||||||
trashPolicy.deleteCheckpoint();
|
trashPolicy.deleteCheckpoint();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Delete all trash immediately. */
|
||||||
|
public void expungeImmediately() throws IOException {
|
||||||
|
trashPolicy.createCheckpoint();
|
||||||
|
trashPolicy.deleteCheckpointsImmediately();
|
||||||
|
}
|
||||||
|
|
||||||
/** get the current working directory */
|
/** get the current working directory */
|
||||||
Path getCurrentTrashDir() throws IOException {
|
Path getCurrentTrashDir() throws IOException {
|
||||||
return trashPolicy.getCurrentTrashDir();
|
return trashPolicy.getCurrentTrashDir();
|
||||||
|
|
|
@ -79,6 +79,11 @@ public abstract class TrashPolicy extends Configured {
|
||||||
*/
|
*/
|
||||||
public abstract void deleteCheckpoint() throws IOException;
|
public abstract void deleteCheckpoint() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete all checkpoints immediately, ie empty trash.
|
||||||
|
*/
|
||||||
|
public abstract void deleteCheckpointsImmediately() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the current working directory of the Trash Policy
|
* Get the current working directory of the Trash Policy
|
||||||
* This API does not work with files deleted from encryption zone when HDFS
|
* This API does not work with files deleted from encryption zone when HDFS
|
||||||
|
|
|
@ -213,11 +213,20 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteCheckpoint() throws IOException {
|
public void deleteCheckpoint() throws IOException {
|
||||||
|
deleteCheckpoint(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteCheckpointsImmediately() throws IOException {
|
||||||
|
deleteCheckpoint(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteCheckpoint(boolean deleteImmediately) throws IOException {
|
||||||
Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
|
Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
|
||||||
for (FileStatus trashRoot : trashRoots) {
|
for (FileStatus trashRoot : trashRoots) {
|
||||||
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
|
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
|
||||||
trashRoot.getPath());
|
trashRoot.getPath());
|
||||||
deleteCheckpoint(trashRoot.getPath());
|
deleteCheckpoint(trashRoot.getPath(), deleteImmediately);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -283,7 +292,7 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
continue;
|
continue;
|
||||||
try {
|
try {
|
||||||
TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
|
TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
|
||||||
trash.deleteCheckpoint(trashRoot.getPath());
|
trash.deleteCheckpoint(trashRoot.getPath(), false);
|
||||||
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
|
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Trash caught: "+e+". Skipping " +
|
LOG.warn("Trash caught: "+e+". Skipping " +
|
||||||
|
@ -341,7 +350,8 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteCheckpoint(Path trashRoot) throws IOException {
|
private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
|
||||||
|
throws IOException {
|
||||||
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
|
LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
|
||||||
|
|
||||||
FileStatus[] dirs = null;
|
FileStatus[] dirs = null;
|
||||||
|
@ -368,7 +378,7 @@ public class TrashPolicyDefault extends TrashPolicy {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((now - deletionInterval) > time) {
|
if (((now - deletionInterval) > time) || deleteImmediately) {
|
||||||
if (fs.delete(path, true)) {
|
if (fs.delete(path, true)) {
|
||||||
LOG.info("Deleted trash checkpoint: "+dir);
|
LOG.info("Deleted trash checkpoint: "+dir);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -219,16 +219,20 @@ class Delete {
|
||||||
// than the retention threshold.
|
// than the retention threshold.
|
||||||
static class Expunge extends FsCommand {
|
static class Expunge extends FsCommand {
|
||||||
public static final String NAME = "expunge";
|
public static final String NAME = "expunge";
|
||||||
public static final String USAGE = "";
|
public static final String USAGE =
|
||||||
|
"[-immediate]";
|
||||||
public static final String DESCRIPTION =
|
public static final String DESCRIPTION =
|
||||||
"Delete files from the trash that are older " +
|
"Delete files from the trash that are older " +
|
||||||
"than the retention threshold";
|
"than the retention threshold";
|
||||||
|
|
||||||
|
private boolean emptyImmediately = false;
|
||||||
|
|
||||||
// TODO: should probably allow path arguments for the filesystems
|
// TODO: should probably allow path arguments for the filesystems
|
||||||
@Override
|
@Override
|
||||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||||
CommandFormat cf = new CommandFormat(0, 0);
|
CommandFormat cf = new CommandFormat(0, 1, "immediate");
|
||||||
cf.parse(args);
|
cf.parse(args);
|
||||||
|
emptyImmediately = cf.getOpt("immediate");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -239,14 +243,23 @@ class Delete {
|
||||||
if (null != childFileSystems) {
|
if (null != childFileSystems) {
|
||||||
for (FileSystem fs : childFileSystems) {
|
for (FileSystem fs : childFileSystems) {
|
||||||
Trash trash = new Trash(fs, getConf());
|
Trash trash = new Trash(fs, getConf());
|
||||||
trash.expunge();
|
if (emptyImmediately) {
|
||||||
trash.checkpoint();
|
trash.expungeImmediately();
|
||||||
|
} else {
|
||||||
|
trash.expunge();
|
||||||
|
trash.checkpoint();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Trash trash = new Trash(getConf());
|
Trash trash = new Trash(getConf());
|
||||||
trash.expunge();
|
if (emptyImmediately) {
|
||||||
trash.checkpoint();
|
trash.expungeImmediately();
|
||||||
|
} else {
|
||||||
|
trash.expunge();
|
||||||
|
trash.checkpoint();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -264,7 +264,7 @@ Displays a summary of file lengths.
|
||||||
expunge
|
expunge
|
||||||
-------
|
-------
|
||||||
|
|
||||||
Usage: `hadoop fs -expunge`
|
Usage: `hadoop fs -expunge [-immediate]`
|
||||||
|
|
||||||
Permanently delete files in checkpoints older than the retention threshold
|
Permanently delete files in checkpoints older than the retention threshold
|
||||||
from trash directory, and create new checkpoint.
|
from trash directory, and create new checkpoint.
|
||||||
|
@ -279,6 +279,9 @@ users can configure to create and delete checkpoints periodically
|
||||||
by the parameter stored as `fs.trash.checkpoint.interval` (in core-site.xml).
|
by the parameter stored as `fs.trash.checkpoint.interval` (in core-site.xml).
|
||||||
This value should be smaller or equal to `fs.trash.interval`.
|
This value should be smaller or equal to `fs.trash.interval`.
|
||||||
|
|
||||||
|
If the `-immediate` option is passed, all files in the trash for the current
|
||||||
|
user are immediately deleted, ignoring the `fs.trash.interval` setting.
|
||||||
|
|
||||||
Refer to the
|
Refer to the
|
||||||
[HDFS Architecture guide](../hadoop-hdfs/HdfsDesign.html#File_Deletes_and_Undeletes)
|
[HDFS Architecture guide](../hadoop-hdfs/HdfsDesign.html#File_Deletes_and_Undeletes)
|
||||||
for more information about trash feature of HDFS.
|
for more information about trash feature of HDFS.
|
||||||
|
|
|
@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -486,6 +488,41 @@ public class TestTrash {
|
||||||
trashRootFs.exists(dirToKeep));
|
trashRootFs.exists(dirToKeep));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify expunge -immediate removes all checkpoints and current folder
|
||||||
|
{
|
||||||
|
// Setup a recent and old checkpoint and a current folder
|
||||||
|
// to be deleted on the next expunge and one that isn't.
|
||||||
|
long trashInterval = conf.getLong(FS_TRASH_INTERVAL_KEY,
|
||||||
|
FS_TRASH_INTERVAL_DEFAULT);
|
||||||
|
long now = Time.now();
|
||||||
|
DateFormat checkpointFormat = new SimpleDateFormat("yyMMddHHmm");
|
||||||
|
Path oldCheckpoint = new Path(trashRoot.getParent(),
|
||||||
|
checkpointFormat.format(now - (trashInterval * 60 * 1000) - 1));
|
||||||
|
Path recentCheckpoint = new Path(trashRoot.getParent(),
|
||||||
|
checkpointFormat.format(now));
|
||||||
|
Path currentFolder = new Path(trashRoot.getParent(), "Current");
|
||||||
|
mkdir(trashRootFs, oldCheckpoint);
|
||||||
|
mkdir(trashRootFs, recentCheckpoint);
|
||||||
|
mkdir(trashRootFs, currentFolder);
|
||||||
|
|
||||||
|
// Clear out trash
|
||||||
|
int rc = -1;
|
||||||
|
try {
|
||||||
|
rc = shell.run(new String[] {"-expunge", "-immediate"});
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Unexpected exception running the trash shell: " +
|
||||||
|
e.getLocalizedMessage());
|
||||||
|
}
|
||||||
|
assertEquals("Expunge immediate should return zero", 0, rc);
|
||||||
|
assertFalse("Old checkpoint should be removed",
|
||||||
|
trashRootFs.exists(oldCheckpoint));
|
||||||
|
assertFalse("Recent checkpoint should be removed",
|
||||||
|
trashRootFs.exists(recentCheckpoint));
|
||||||
|
assertFalse("Current folder should be removed",
|
||||||
|
trashRootFs.exists(currentFolder));
|
||||||
|
assertEquals("Ensure trash folder is empty",
|
||||||
|
trashRootFs.listStatus(trashRoot.getParent()).length, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void trashNonDefaultFS(Configuration conf) throws IOException {
|
public static void trashNonDefaultFS(Configuration conf) throws IOException {
|
||||||
|
@ -1000,6 +1037,10 @@ public class TestTrash {
|
||||||
public void deleteCheckpoint() throws IOException {
|
public void deleteCheckpoint() throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteCheckpointsImmediately() throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path getCurrentTrashDir() {
|
public Path getCurrentTrashDir() {
|
||||||
return null;
|
return null;
|
||||||
|
@ -1059,6 +1100,11 @@ public class TestTrash {
|
||||||
AuditableCheckpoints.delete();
|
AuditableCheckpoints.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteCheckpointsImmediately() throws IOException {
|
||||||
|
AuditableCheckpoints.deleteAll();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path getCurrentTrashDir() {
|
public Path getCurrentTrashDir() {
|
||||||
return null;
|
return null;
|
||||||
|
@ -1115,25 +1161,32 @@ public class TestTrash {
|
||||||
*/
|
*/
|
||||||
private static class AuditableCheckpoints {
|
private static class AuditableCheckpoints {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(AuditableCheckpoints.class);
|
||||||
|
|
||||||
private static AtomicInteger numOfCheckpoint =
|
private static AtomicInteger numOfCheckpoint =
|
||||||
new AtomicInteger(0);
|
new AtomicInteger(0);
|
||||||
|
|
||||||
private static void add() {
|
private static void add() {
|
||||||
numOfCheckpoint.incrementAndGet();
|
numOfCheckpoint.incrementAndGet();
|
||||||
System.out.println(String
|
LOG.info("Create a checkpoint, current number of checkpoints {}",
|
||||||
.format("Create a checkpoint, current number of checkpoints %d",
|
numOfCheckpoint.get());
|
||||||
numOfCheckpoint.get()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void delete() {
|
private static void delete() {
|
||||||
if(numOfCheckpoint.get() > 0) {
|
if(numOfCheckpoint.get() > 0) {
|
||||||
numOfCheckpoint.decrementAndGet();
|
numOfCheckpoint.decrementAndGet();
|
||||||
System.out.println(String
|
LOG.info("Delete a checkpoint, current number of checkpoints {}",
|
||||||
.format("Delete a checkpoint, current number of checkpoints %d",
|
numOfCheckpoint.get());
|
||||||
numOfCheckpoint.get()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void deleteAll() {
|
||||||
|
numOfCheckpoint.set(0);
|
||||||
|
LOG.info("Delete all checkpoints, current number of checkpoints {}",
|
||||||
|
numOfCheckpoint.get());
|
||||||
|
}
|
||||||
|
|
||||||
private static int get() {
|
private static int get() {
|
||||||
return numOfCheckpoint.get();
|
return numOfCheckpoint.get();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue