HDFS-16972. Delete a snapshot may deleteCurrentFile. (#5532)

This commit is contained in:
Tsz-Wo Nicholas Sze 2023-04-27 09:17:47 -07:00 committed by GitHub
parent 60a7e8acaa
commit d9576bb9ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 738 additions and 39 deletions

View File

@ -77,6 +77,12 @@ public class FsImageValidation {
static final String FS_IMAGE = "FS_IMAGE";
static String getEnv(String property) {
final String value = System.getenv().get(property);
LOG.info("ENV: {} = {}", property, value);
return value;
}
static FsImageValidation newInstance(String... args) {
final String f = Cli.parse(args);
if (f == null) {
@ -302,10 +308,7 @@ public int run(String[] args) throws Exception {
static String parse(String... args) {
final String f;
if (args == null || args.length == 0) {
f = System.getenv().get(FS_IMAGE);
if (f != null) {
println("Environment variable %s = %s", FS_IMAGE, f);
}
f = getEnv(FS_IMAGE);
} else if (args.length == 1) {
f = args[0];
} else {

View File

@ -49,6 +49,7 @@
import java.io.StringWriter;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* We keep an in-memory representation of the file/block hierarchy.
@ -1016,6 +1017,8 @@ public static class ReclaimContext {
/** Used to collect quota usage delta */
private final QuotaDelta quotaDelta;
private Snapshot snapshotToBeDeleted = null;
/**
* @param bsps
* block storage policy suite to calculate intended storage type
@ -1037,6 +1040,36 @@ public ReclaimContext(
this.quotaDelta = new QuotaDelta();
}
/**
* Set the snapshot to be deleted
* for {@link FSEditLogOpCodes#OP_DELETE_SNAPSHOT}.
*
* @param snapshot the snapshot to be deleted
*/
public void setSnapshotToBeDeleted(Snapshot snapshot) {
this.snapshotToBeDeleted = Objects.requireNonNull(
snapshot, "snapshot == null");
}
/**
* For {@link FSEditLogOpCodes#OP_DELETE_SNAPSHOT},
* return the snapshot to be deleted.
* For other ops, return {@link Snapshot#CURRENT_STATE_ID}.
*/
public int getSnapshotIdToBeDeleted() {
return Snapshot.getSnapshotId(snapshotToBeDeleted);
}
public int getSnapshotIdToBeDeleted(int snapshotId, INode inode) {
final int snapshotIdToBeDeleted = getSnapshotIdToBeDeleted();
if (snapshotId != snapshotIdToBeDeleted) {
LOG.warn("Snapshot changed: current = {}, original = {}, inode: {}",
Snapshot.getSnapshotString(snapshotId), snapshotToBeDeleted,
inode.toDetailString());
}
return snapshotIdToBeDeleted;
}
public BlockStoragePolicySuite storagePolicySuite() {
return bsps;
}
@ -1054,8 +1087,11 @@ public QuotaDelta quotaDelta() {
* removedUCFiles but a new quotaDelta.
*/
public ReclaimContext getCopy() {
return new ReclaimContext(bsps, collectedBlocks, removedINodes,
final ReclaimContext that = new ReclaimContext(
bsps, collectedBlocks, removedINodes,
removedUCFiles);
that.snapshotToBeDeleted = this.snapshotToBeDeleted;
return that;
}
}

View File

@ -35,32 +35,48 @@
import org.apache.hadoop.security.AccessControlException;
/**
* An anonymous reference to an inode.
*
* A reference to an inode.
* <p>
* This class and its subclasses are used to support multiple access paths.
* A file/directory may have multiple access paths when it is stored in some
* snapshots and it is renamed/moved to other locations.
*
* snapshots, and it is renamed/moved to other locations.
* <p>
* For example,
* (1) Suppose we have /abc/foo, say the inode of foo is inode(id=1000,name=foo)
* (2) create snapshot s0 for /abc
* (1) Suppose we have /abc/foo and the inode is inode(id=1000,name=foo).
* Suppose foo is created after snapshot s0,
* i.e. foo is not in s0 and inode(id=1000,name=foo)
* is in the create-list of /abc for the s0 diff entry.
* (2) Create snapshot s1, s2 for /abc, i.e. foo is in s1 and s2.
* Suppose sDst is the last snapshot /xyz.
* (3) mv /abc/foo /xyz/bar, i.e. inode(id=1000,name=...) is renamed from "foo"
* to "bar" and its parent becomes /xyz.
*
* Then, /xyz/bar and /abc/.snapshot/s0/foo are two different access paths to
* the same inode, inode(id=1000,name=bar).
*
* <p>
* Then, /xyz/bar, /abc/.snapshot/s1/foo and /abc/.snapshot/s2/foo
* are different access paths to the same inode, inode(id=1000,name=bar).
* Inside the inode tree, /abc/.snapshot/s1/foo and /abc/.snapshot/s2/foo
* indeed have the same resolved path,
* but /xyz/bar has a different resolved path.
* <p>
* With references, we have the following
* - /abc has a child ref(id=1001,name=foo).
* - /xyz has a child ref(id=1002)
* - Both ref(id=1001,name=foo) and ref(id=1002) point to another reference,
* ref(id=1003,count=2).
* - Finally, ref(id=1003,count=2) points to inode(id=1000,name=bar).
*
* Note 1: For a reference without name, e.g. ref(id=1002), it uses the name
* of the referred inode.
* - The source /abc/foo inode(id=1000,name=foo) is replaced with
* a WithName(name=foo,lastSnapshot=s2) and then it is moved
* to the delete-list of /abc for the s2 diff entry.
* The replacement also replaces inode(id=1000,name=foo)
* in the create-list of /abc for the s0 diff entry with the WithName.
* The same as before, /abc/foo is in s1 and s2, but not in s0.
* - The destination /xyz adds a child DstReference(dstSnapshot=sDst).
* DstReference is added to the create-list of /xyz for the sDst diff entry.
* /xyz/bar is not in sDst.
* - Both WithName and DstReference point to another reference WithCount(count=2).
* - Finally, WithCount(count=2) points to inode(id=1000,name=bar)
* Note that the inode name is changed to "bar".
* <p>
* Note 1: References other than WithName use the name of the referred inode,
* i.e. WithCount and DstReference do not have their own name.
* Note 2: getParent() always returns the parent in the current state, e.g.
* inode(id=1000,name=bar).getParent() returns /xyz but not /abc.
* Note 3: {@link INodeReference#getId()} returns the id the referred inode,
* e.g. all WithName, DstReference and WithCount above return id=1000.
*/
public abstract class INodeReference extends INode {
/** Assert the relationship this node and the references. */
@ -409,9 +425,9 @@ public String getCountDetails() {
final StringBuilder b = new StringBuilder("[");
if (!withNameList.isEmpty()) {
final Iterator<WithName> i = withNameList.iterator();
b.append(i.next().getFullPathAndObjectString());
b.append(i.next().getNameDetails());
for(; i.hasNext();) {
b.append(", ").append(i.next().getFullPathAndObjectString());
b.append(", ").append(i.next().getNameDetails());
}
}
b.append("]");
@ -548,7 +564,9 @@ public static class WithName extends INodeReference {
/**
* The id of the last snapshot in the src tree when this WithName node was
* generated. When calculating the quota usage of the referred node, only
* generated, i.e. this reference is in that snapshot.
* <p>
* When calculating the quota usage of the referred node, only
* the files/dirs existing when this snapshot was taken will be counted for
* this WithName node and propagated along its ancestor path.
*/
@ -564,6 +582,11 @@ public WithName(INodeDirectory parent, WithCount referred, byte[] name,
INodeReferenceValidation.add(this, WithName.class);
}
String getNameDetails() {
return getClass().getSimpleName() + "[" + getLocalName()
+ ", lastSnapshot=" + lastSnapshotId + "]";
}
@Override
void assertReferences() {
final INode ref= getReferredINode();
@ -727,7 +750,8 @@ private int getSelfSnapshot() {
public static class DstReference extends INodeReference {
/**
* Record the latest snapshot of the dst subtree before the rename. For
* Record the latest snapshot of the dst subtree before the rename,
* i.e. this reference is NOT in that snapshot. For
* later operations on the moved/renamed files/directories, if the latest
* snapshot is after this dstSnapshot, changes will be recorded to the
* latest snapshot. Otherwise changes will be recorded to the snapshot
@ -752,6 +776,11 @@ public DstReference(INodeDirectory parent, WithCount referred,
INodeReferenceValidation.add(this, DstReference.class);
}
String getDstDetails() {
return getClass().getSimpleName() + "[" + getLocalName()
+ ", dstSnapshot=" + dstSnapshotId + "]";
}
@Override
void assertReferences() {
final INode ref = getReferredINode();
@ -795,6 +824,26 @@ public void cleanSubtree(ReclaimContext reclaimContext, int snapshot,
}
}
/**
* When dstSnapshotId >= snapshotToBeDeleted,
* this reference is not in snapshotToBeDeleted.
* This reference should not be destroyed.
*
* @param context to {@link ReclaimContext#getSnapshotIdToBeDeleted()}
*/
private boolean shouldDestroy(ReclaimContext context) {
final int snapshotToBeDeleted = context.getSnapshotIdToBeDeleted();
if (dstSnapshotId < snapshotToBeDeleted) {
return true;
}
LOG.warn("Try to destroy a DstReference with dstSnapshotId = {}"
+ " >= snapshotToBeDeleted = {}", dstSnapshotId, snapshotToBeDeleted);
LOG.warn(" dstRef: {}", toDetailString());
final INode r = getReferredINode().asReference().getReferredINode();
LOG.warn(" referred: {}", r.toDetailString());
return false;
}
/**
* {@inheritDoc}
* <br>
@ -808,6 +857,10 @@ public void cleanSubtree(ReclaimContext reclaimContext, int snapshot,
*/
@Override
public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
if (!shouldDestroy(reclaimContext)) {
return;
}
// since we count everything of the subtree for the quota usage of a
// dst reference node, here we should just simply do a quota computation.
// then to avoid double counting, we pass a different QuotaDelta to other

View File

@ -266,6 +266,8 @@ public Snapshot removeSnapshot(
final Snapshot snapshot = snapshotsByNames.get(i);
int prior = Snapshot.findLatestSnapshot(snapshotRoot, snapshot.getId());
snapshotManager.assertPrior(snapshotRoot, snapshotName, prior);
reclaimContext.setSnapshotToBeDeleted(snapshot);
snapshotRoot.cleanSubtree(reclaimContext, snapshot.getId(), prior);
// remove from snapshotsByNames after successfully cleaning the subtree
snapshotsByNames.remove(i);

View File

@ -122,9 +122,12 @@ public String getDetailedString() {
public void cleanFile(INode.ReclaimContext reclaimContext,
final INodeFile file, final int snapshotId, int priorSnapshotId,
byte storagePolicyId) {
final int snapshotToBeDeleted
= reclaimContext.getSnapshotIdToBeDeleted(snapshotId, file);
if (snapshotId == Snapshot.CURRENT_STATE_ID) {
// delete the current file while the file has snapshot feature
if (!isCurrentFileDeleted()) {
if (!isCurrentFileDeleted()
&& snapshotToBeDeleted == Snapshot.CURRENT_STATE_ID) {
file.recordModification(priorSnapshotId);
deleteCurrentFile();
}

View File

@ -94,6 +94,12 @@ public static int getSnapshotId(Snapshot s) {
return s == null ? CURRENT_STATE_ID : s.getId();
}
public static String getSnapshotString(int snapshot) {
return snapshot == CURRENT_STATE_ID? "<CURRENT_STATE>"
: snapshot == NO_SNAPSHOT_ID? "<NO_SNAPSHOT>"
: "Snapshot #" + snapshot;
}
/**
* Compare snapshot with IDs, where null indicates the current status thus
* is greater than any non-null snapshot.

View File

@ -53,8 +53,8 @@ public void testValidation() throws Exception {
final int errorCount = FsImageValidation.newInstance().run();
Assert.assertEquals("Error Count: " + errorCount, 0, errorCount);
} catch (HadoopIllegalArgumentException e) {
LOG.warn("The environment variable {} is not set: {}",
FsImageValidation.FS_IMAGE, e);
LOG.warn("The environment variable " + FsImageValidation.FS_IMAGE
+ " is not set", e);
}
}

View File

@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.TrashPolicyDefault;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -42,12 +45,19 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.GSet;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.LogManager;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.WriterAppender;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -61,18 +71,18 @@ public class SnapshotTestHelper {
/** Disable the logs that are not very useful for snapshot related tests. */
public static void disableLogs() {
final String[] lognames = {
"org.eclipse.jetty",
"org.apache.hadoop.ipc",
"org.apache.hadoop.net",
"org.apache.hadoop.security",
"org.apache.hadoop.hdfs.server.blockmanagement",
"org.apache.hadoop.hdfs.server.common.Util",
"org.apache.hadoop.hdfs.server.blockmanagement.BlockReportLeaseManager",
"org.apache.hadoop.hdfs.server.datanode",
"org.apache.hadoop.hdfs.server.namenode.FileJournalManager",
"org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager",
"org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf",
"org.apache.hadoop.hdfs.server.namenode.FSEditLog",
"org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner",
"org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice",
"org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl",
"org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService",
"org.apache.hadoop.hdfs.server.datanode.fsdataset.impl" +
".RamDiskAsyncLazyPersistService",
};
for(String n : lognames) {
GenericTestUtils.disableLog(LoggerFactory.getLogger(n));
@ -105,6 +115,198 @@ public static void disableLogs() {
GenericTestUtils.disableLog(Server.LOG);
}
static class MyCluster {
private final MiniDFSCluster cluster;
private final FSNamesystem fsn;
private final FSDirectory fsdir;
private final DistributedFileSystem hdfs;
private final FsShell shell = new FsShell();
private final Path snapshotDir = new Path("/");
private final AtomicInteger snapshotCount = new AtomicInteger();
private final AtomicInteger trashMoveCount = new AtomicInteger();
private final AtomicInteger printTreeCount = new AtomicInteger();
private final AtomicBoolean printTree = new AtomicBoolean();
MyCluster(Configuration conf) throws Exception {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.format(true)
.build();
fsn = cluster.getNamesystem();
fsdir = fsn.getFSDirectory();
cluster.waitActive();
hdfs = cluster.getFileSystem();
hdfs.allowSnapshot(snapshotDir);
createSnapshot();
shell.setConf(cluster.getConfiguration(0));
runShell("-mkdir", "-p", ".Trash");
}
void setPrintTree(boolean print) {
printTree.set(print);
}
boolean getPrintTree() {
return printTree.get();
}
Path getTrashPath(Path p) throws Exception {
final Path trash = hdfs.getTrashRoot(p);
final Path resolved = hdfs.resolvePath(p);
return new Path(trash, "Current/" + resolved.toUri().getPath());
}
int runShell(String... argv) {
return shell.run(argv);
}
String createSnapshot()
throws Exception {
final String name = "s" + snapshotCount.getAndIncrement();
SnapshotTestHelper.createSnapshot(hdfs, snapshotDir, name);
return name;
}
void deleteSnapshot(String snapshotName) throws Exception {
LOG.info("Before delete snapshot " + snapshotName);
hdfs.deleteSnapshot(snapshotDir, snapshotName);
}
boolean assertExists(Path path) throws Exception {
if (path == null) {
return false;
}
if (!hdfs.exists(path)) {
final String err = "Path not found: " + path;
printFs(err);
throw new AssertionError(err);
}
return true;
}
void printFs(String label) {
final PrintStream out = System.out;
out.println();
out.println();
out.println("XXX " + printTreeCount.getAndIncrement() + ": " + label);
if (printTree.get()) {
fsdir.getRoot().dumpTreeRecursively(out);
}
}
void shutdown() {
LOG.info("snapshotCount: {}", snapshotCount);
cluster.shutdown();
}
Path mkdirs(String dir) throws Exception {
return mkdirs(new Path(dir));
}
Path mkdirs(Path dir) throws Exception {
final String label = "mkdirs " + dir;
LOG.info(label);
hdfs.mkdirs(dir);
Assert.assertTrue(label, hdfs.exists(dir));
return dir;
}
Path createFile(String file) throws Exception {
return createFile(new Path(file));
}
Path createFile(Path file) throws Exception {
final String label = "createFile " + file;
LOG.info(label);
DFSTestUtil.createFile(hdfs, file, 0, (short)1, 0L);
Assert.assertTrue(label, hdfs.exists(file));
return file;
}
String rename(Path src, Path dst) throws Exception {
assertExists(src);
final String snapshot = createSnapshot();
final String label = "rename " + src + " -> " + dst;
final boolean renamed = hdfs.rename(src, dst);
LOG.info("{}: success? {}", label, renamed);
Assert.assertTrue(label, renamed);
return snapshot;
}
Path moveToTrash(Path path, boolean printFs) throws Exception {
return moveToTrash(path.toString(), printFs);
}
Path moveToTrash(String path, boolean printFs) throws Exception {
final Log4jRecorder recorder = Log4jRecorder.record(
LoggerFactory.getLogger(TrashPolicyDefault.class));
runShell("-rm", "-r", path);
final String label = "moveToTrash-" + trashMoveCount.getAndIncrement() + " " + path;
if (printFs) {
printFs(label);
} else {
LOG.info(label);
}
final String recorded = recorder.getRecorded();
LOG.info("Recorded: {}", recorded);
final String pattern = " to trash at: ";
final int i = recorded.indexOf(pattern);
if (i > 0) {
final String sub = recorded.substring(i + pattern.length());
return new Path(sub.trim());
}
return null;
}
}
/** Records log messages from a Log4j logger. */
public static final class Log4jRecorder {
static Log4jRecorder record(org.slf4j.Logger logger) {
return new Log4jRecorder(toLog4j(logger), getLayout());
}
static org.apache.log4j.Logger toLog4j(org.slf4j.Logger logger) {
return LogManager.getLogger(logger.getName());
}
static Layout getLayout() {
final org.apache.log4j.Logger root
= org.apache.log4j.Logger.getRootLogger();
Appender a = root.getAppender("stdout");
if (a == null) {
a = root.getAppender("console");
}
return a == null? new PatternLayout() : a.getLayout();
}
private final StringWriter stringWriter = new StringWriter();
private final WriterAppender appender;
private final org.apache.log4j.Logger logger;
private Log4jRecorder(org.apache.log4j.Logger logger, Layout layout) {
this.appender = new WriterAppender(layout, stringWriter);
this.logger = logger;
this.logger.addAppender(this.appender);
}
public String getRecorded() {
return stringWriter.toString();
}
public void stop() {
logger.removeAppender(appender);
}
public void clear() {
stringWriter.getBuffer().setLength(0);
}
}
private SnapshotTestHelper() {
// Cannot be instantinatied
}

View File

@ -0,0 +1,394 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
* Testing snapshots with FsShell move-to-trash feature.
*/
public class TestFsShellMoveToTrashWithSnapshots {
static {
SnapshotTestHelper.disableLogs();
}
private static final Logger LOG =
LoggerFactory.getLogger("XXX");
private static final String TMP = ".tmp";
private static final String WAREHOUSE_DIR = "/warehouse/sub/";
private static final String TO_BE_REMOVED = "TMP/";
private static SnapshotTestHelper.MyCluster cluster;
@Before
public void setUp() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 100);
cluster = new SnapshotTestHelper.MyCluster(conf);
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
static class MyDirs {
private final Path base;
private final boolean[] moved;
private final List<Integer> renames = new ArrayList<>();
MyDirs(Path base, int depth) {
this.base = base;
this.moved = new boolean[depth];
for (int i = 0; i < depth; i++) {
renames.add(i);
}
Collections.shuffle(renames);
}
int depth() {
return moved.length;
}
DeleteSnapshotOp rename() throws Exception {
final int i = renames.remove(renames.size() - 1);
final String snapshot = cluster.rename(getSubPath(i + 1), getSubPath(i));
moved[i] = true;
return new DeleteSnapshotOp(snapshot);
}
Path getSubPath(int n) {
if (n == 0) {
return base;
}
final StringBuilder b = new StringBuilder();
for (int i = 0; i < n; i++) {
if (!moved[i]) {
b.append(TO_BE_REMOVED);
}
b.append("dir").append(i).append("/");
}
return new Path(base, b.toString());
}
Path getPath() {
return getSubPath(moved.length);
}
}
static class MyFile {
private final Path tmp;
private Path dst;
private Path trash;
MyFile(String filePath) {
this.tmp = new Path(filePath + TMP);
}
@Override
public String toString() {
return "MyFile{" +
"tmp=" + tmp +
", dst=" + dst +
", trash=" + trash +
'}';
}
synchronized Path getPath() {
return trash != null ? trash
: dst != null ? dst
: tmp;
}
synchronized String moveFromTmp2Dst(Path dstDir) throws Exception {
final String tmpName = tmp.getName();
dst = new Path(dstDir, tmpName.substring(0, tmpName.length() - 4));
final String snapshot = cluster.rename(tmp, dst);
trash = cluster.getTrashPath(dst);
return snapshot;
}
}
MyFile createTmp(String filePath) throws Exception {
final MyFile f = new MyFile(filePath);
cluster.createFile(f.tmp);
return f;
}
DeleteSnapshotOp moveFromTmp2Dst(MyFile file, Path dstDir) throws Exception {
final String snapshot = file.moveFromTmp2Dst(dstDir);
return new DeleteSnapshotOp(snapshot);
}
List<MyFile> runTestMoveToTrashWithShell(
Path dbDir, Path tmpDir, int numFiles)
throws Exception {
return runTestMoveToTrashWithShell(dbDir, tmpDir, numFiles, 4, null);
}
List<MyFile> runTestMoveToTrashWithShell(
Path dbDir, Path tmpDir, int numFiles, int depth, Integer randomSleepMaxMs)
throws Exception {
LOG.info("dbDir={}", dbDir);
LOG.info("tmpDir={}", tmpDir);
LOG.info("numFiles={}, depth={}, randomSleepMaxMs={}", numFiles, depth, randomSleepMaxMs);
cluster.setPrintTree(numFiles < 10);
final List<Op> ops = new ArrayList<>();
createSnapshot(ops);
//swap sub1 and sub2
Path sub1 = cluster.mkdirs(new Path(dbDir, "sub1"));
Path sub2 = cluster.mkdirs(new Path(sub1, "sub2"));
ops.add(new DeleteSnapshotOp(cluster.rename(sub2, dbDir)));
sub2 = new Path(dbDir, "sub2");
ops.add(new DeleteSnapshotOp(cluster.rename(sub1, sub2)));
sub1 = new Path(sub2, "sub1");
final MyDirs dirs = new MyDirs(sub1, depth);
cluster.mkdirs(dirs.getPath());
final List<MyFile> buckets = new ArrayList<>();
for (int i = 0; i < dirs.depth() / 2; i++) {
ops.add(dirs.rename());
}
final int offset = numFiles / 4;
for (int i = 0; i < numFiles; i++) {
final String bucket = tmpDir + String.format("/bucket_%04d", i);
createSnapshot(ops);
buckets.add(createTmp(bucket));
if (i >= offset) {
final int j = i - offset;
ops.add(moveFromTmp2Dst(buckets.get(j), dirs.getPath()));
}
if (randomSleepMaxMs != null) {
Thread.sleep(ThreadLocalRandom.current().nextInt(randomSleepMaxMs));
}
}
for (int i = dirs.depth() / 2; i < dirs.depth(); i++) {
ops.add(dirs.rename());
}
ops.add(new DeleteSnapshotOp(cluster.rename(dirs.getSubPath(1), sub2)));
ops.add(new DeleteSnapshotOp(cluster.rename(sub1, dbDir)));
sub1 = new Path(dbDir, "sub1");
ops.add(new DeleteSnapshotOp(cluster.rename(sub2, sub1)));
sub2 = new Path(sub1, "sub2");
ops.add(new DeleteSnapshotOp(cluster.rename(sub2, new Path(sub1, "sub1"))));
ops.add(new DeleteSnapshotOp(cluster.rename(sub1, new Path(dbDir, "sub2"))));
final MoveToTrashOp m = new MoveToTrashOp(dbDir);
m.trashPath.thenAccept(p -> updateTrashPath(p, buckets));
ops.add(m);
LOG.info("ops count: {}", ops.size());
while (!ops.isEmpty()) {
runOneOp(ops);
}
cluster.printFs("END");
return buckets;
}
static Path removeSubstring(Path p) {
if (p == null) {
return null;
}
return new Path(p.toUri().getPath().replace(TO_BE_REMOVED, ""));
}
void updateTrashPath(String trashPathPrefix, List<MyFile> files) {
final String commonPrefix;
final int j = trashPathPrefix.lastIndexOf('/');
commonPrefix = trashPathPrefix.substring(0, j + 1);
for (MyFile f : files) {
final String original = f.trash.toUri().getPath();
if (!original.startsWith(trashPathPrefix)) {
Assert.assertTrue(original.startsWith(commonPrefix));
final int i = original.indexOf('/', commonPrefix.length());
final String suffix = original.substring(i + 1);
f.trash = new Path(trashPathPrefix, suffix);
}
}
}
@Test(timeout = 300_000)
public void test100tasks20files() throws Exception {
runMultipleTasks(100, 20);
}
@Test(timeout = 300_000)
public void test10tasks200files() throws Exception {
runMultipleTasks(10, 200);
}
void runMultipleTasks(int numTasks, int filesPerTask) throws Exception {
final List<Future<List<MyFile>>> futures = new ArrayList<>();
final List<MyFile> buckets = new ArrayList<>();
final ExecutorService executor = Executors.newFixedThreadPool(10);
try {
for (int i = 0; i < numTasks; i++) {
final String db = "db" + i;
final String tmp = "tmp" + i;
futures.add(executor.submit(() -> {
final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + db);
final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + tmp);
return runTestMoveToTrashWithShell(dbDir, tmpDir, filesPerTask, 4, 100);
}));
}
for (Future<List<MyFile>> f : futures) {
buckets.addAll(f.get());
}
} finally {
executor.shutdown();
}
assertExists(buckets, f -> removeSubstring(f.getPath()));
}
@Test(timeout = 100_000)
public void test4files() throws Exception {
final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db");
final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp");
final List<MyFile> buckets = runTestMoveToTrashWithShell(
dbDir, tmpDir, 4, 2, null);
assertExists(buckets, f -> removeSubstring(f.getPath()));
}
@Test(timeout = 300_000)
public void test200files() throws Exception {
final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db");
final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp");
final List<MyFile> buckets = runTestMoveToTrashWithShell(
dbDir, tmpDir, 200);
assertExists(buckets, f -> removeSubstring(f.getPath()));
}
@Test(timeout = 300_000)
public void test50files10times() throws Exception {
final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp");
final List<MyFile> buckets = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db");
buckets.addAll(runTestMoveToTrashWithShell(dbDir, tmpDir, 50));
}
cluster.setPrintTree(true);
cluster.printFs("test_10files_10times");
assertExists(buckets, f -> removeSubstring(f.getPath()));
}
static void createSnapshot(List<Op> ops) throws Exception {
if (ThreadLocalRandom.current().nextBoolean()) {
ops.add(new DeleteSnapshotOp(cluster.createSnapshot()));
}
}
void runOneOp(List<Op> ops) throws Exception {
Collections.shuffle(ops);
final Op op = ops.remove(ops.size() - 1);
if (op instanceof MoveToTrashOp) {
createSnapshot(ops);
}
op.execute();
}
static abstract class Op {
private final AtomicBoolean executed = new AtomicBoolean();
final void execute() throws Exception {
if (executed.compareAndSet(false, true)) {
executeImpl();
}
}
final boolean isExecuted() {
return executed.get();
}
abstract void executeImpl() throws Exception;
}
static class MoveToTrashOp extends Op {
private final Path path;
private final CompletableFuture<String> trashPath = new CompletableFuture<>();
MoveToTrashOp(Path path) {
this.path = path;
}
@Override
public void executeImpl() throws Exception {
final Path p = cluster.moveToTrash(path, true);
LOG.info("MoveToTrash: {} -> {}", path, p);
trashPath.complete(p.toUri().getPath());
}
}
static class DeleteSnapshotOp extends Op {
private final String name;
DeleteSnapshotOp(String name) {
this.name = name;
}
@Override
void executeImpl() throws Exception {
cluster.deleteSnapshot(name);
}
}
void assertExists(List<MyFile> files, Function<MyFile, Path> getPath)
throws Exception {
for (MyFile f : files) {
final Path p = getPath.apply(f);
final boolean exists = cluster.assertExists(p);
if (cluster.getPrintTree()) {
LOG.info("{} exists? {}, {}", p, exists, f);
}
}
}
}