diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java index 067ea5e9a8d..ab301104f2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java @@ -77,6 +77,12 @@ public class FsImageValidation { static final String FS_IMAGE = "FS_IMAGE"; + static String getEnv(String property) { + final String value = System.getenv().get(property); + LOG.info("ENV: {} = {}", property, value); + return value; + } + static FsImageValidation newInstance(String... args) { final String f = Cli.parse(args); if (f == null) { @@ -302,10 +308,7 @@ public int run(String[] args) throws Exception { static String parse(String... args) { final String f; if (args == null || args.length == 0) { - f = System.getenv().get(FS_IMAGE); - if (f != null) { - println("Environment variable %s = %s", FS_IMAGE, f); - } + f = getEnv(FS_IMAGE); } else if (args.length == 1) { f = args[0]; } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index b902c37109a..ad8aff00415 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -49,6 +49,7 @@ import java.io.StringWriter; import java.util.List; import java.util.Map; +import java.util.Objects; /** * We keep an in-memory representation of the file/block hierarchy. @@ -1016,6 +1017,8 @@ public static class ReclaimContext { /** Used to collect quota usage delta */ private final QuotaDelta quotaDelta; + private Snapshot snapshotToBeDeleted = null; + /** * @param bsps * block storage policy suite to calculate intended storage type @@ -1037,6 +1040,36 @@ public ReclaimContext( this.quotaDelta = new QuotaDelta(); } + /** + * Set the snapshot to be deleted + * for {@link FSEditLogOpCodes#OP_DELETE_SNAPSHOT}. + * + * @param snapshot the snapshot to be deleted + */ + public void setSnapshotToBeDeleted(Snapshot snapshot) { + this.snapshotToBeDeleted = Objects.requireNonNull( + snapshot, "snapshot == null"); + } + + /** + * For {@link FSEditLogOpCodes#OP_DELETE_SNAPSHOT}, + * return the snapshot to be deleted. + * For other ops, return {@link Snapshot#CURRENT_STATE_ID}. + */ + public int getSnapshotIdToBeDeleted() { + return Snapshot.getSnapshotId(snapshotToBeDeleted); + } + + public int getSnapshotIdToBeDeleted(int snapshotId, INode inode) { + final int snapshotIdToBeDeleted = getSnapshotIdToBeDeleted(); + if (snapshotId != snapshotIdToBeDeleted) { + LOG.warn("Snapshot changed: current = {}, original = {}, inode: {}", + Snapshot.getSnapshotString(snapshotId), snapshotToBeDeleted, + inode.toDetailString()); + } + return snapshotIdToBeDeleted; + } + public BlockStoragePolicySuite storagePolicySuite() { return bsps; } @@ -1054,8 +1087,11 @@ public QuotaDelta quotaDelta() { * removedUCFiles but a new quotaDelta. */ public ReclaimContext getCopy() { - return new ReclaimContext(bsps, collectedBlocks, removedINodes, + final ReclaimContext that = new ReclaimContext( + bsps, collectedBlocks, removedINodes, removedUCFiles); + that.snapshotToBeDeleted = this.snapshotToBeDeleted; + return that; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java index 6e655f7a134..df4b43039c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java @@ -35,32 +35,48 @@ import org.apache.hadoop.security.AccessControlException; /** - * An anonymous reference to an inode. - * + * A reference to an inode. + *

* This class and its subclasses are used to support multiple access paths. * A file/directory may have multiple access paths when it is stored in some - * snapshots and it is renamed/moved to other locations. - * + * snapshots, and it is renamed/moved to other locations. + *

* For example, - * (1) Suppose we have /abc/foo, say the inode of foo is inode(id=1000,name=foo) - * (2) create snapshot s0 for /abc + * (1) Suppose we have /abc/foo and the inode is inode(id=1000,name=foo). + * Suppose foo is created after snapshot s0, + * i.e. foo is not in s0 and inode(id=1000,name=foo) + * is in the create-list of /abc for the s0 diff entry. + * (2) Create snapshot s1, s2 for /abc, i.e. foo is in s1 and s2. + * Suppose sDst is the last snapshot /xyz. * (3) mv /abc/foo /xyz/bar, i.e. inode(id=1000,name=...) is renamed from "foo" * to "bar" and its parent becomes /xyz. - * - * Then, /xyz/bar and /abc/.snapshot/s0/foo are two different access paths to - * the same inode, inode(id=1000,name=bar). - * + *

+ * Then, /xyz/bar, /abc/.snapshot/s1/foo and /abc/.snapshot/s2/foo + * are different access paths to the same inode, inode(id=1000,name=bar). + * Inside the inode tree, /abc/.snapshot/s1/foo and /abc/.snapshot/s2/foo + * indeed have the same resolved path, + * but /xyz/bar has a different resolved path. + *

* With references, we have the following - * - /abc has a child ref(id=1001,name=foo). - * - /xyz has a child ref(id=1002) - * - Both ref(id=1001,name=foo) and ref(id=1002) point to another reference, - * ref(id=1003,count=2). - * - Finally, ref(id=1003,count=2) points to inode(id=1000,name=bar). - * - * Note 1: For a reference without name, e.g. ref(id=1002), it uses the name - * of the referred inode. + * - The source /abc/foo inode(id=1000,name=foo) is replaced with + * a WithName(name=foo,lastSnapshot=s2) and then it is moved + * to the delete-list of /abc for the s2 diff entry. + * The replacement also replaces inode(id=1000,name=foo) + * in the create-list of /abc for the s0 diff entry with the WithName. + * The same as before, /abc/foo is in s1 and s2, but not in s0. + * - The destination /xyz adds a child DstReference(dstSnapshot=sDst). + * DstReference is added to the create-list of /xyz for the sDst diff entry. + * /xyz/bar is not in sDst. + * - Both WithName and DstReference point to another reference WithCount(count=2). + * - Finally, WithCount(count=2) points to inode(id=1000,name=bar) + * Note that the inode name is changed to "bar". + *

+ * Note 1: References other than WithName use the name of the referred inode, + * i.e. WithCount and DstReference do not have their own name. * Note 2: getParent() always returns the parent in the current state, e.g. * inode(id=1000,name=bar).getParent() returns /xyz but not /abc. + * Note 3: {@link INodeReference#getId()} returns the id the referred inode, + * e.g. all WithName, DstReference and WithCount above return id=1000. */ public abstract class INodeReference extends INode { /** Assert the relationship this node and the references. */ @@ -409,9 +425,9 @@ public String getCountDetails() { final StringBuilder b = new StringBuilder("["); if (!withNameList.isEmpty()) { final Iterator i = withNameList.iterator(); - b.append(i.next().getFullPathAndObjectString()); + b.append(i.next().getNameDetails()); for(; i.hasNext();) { - b.append(", ").append(i.next().getFullPathAndObjectString()); + b.append(", ").append(i.next().getNameDetails()); } } b.append("]"); @@ -548,7 +564,9 @@ public static class WithName extends INodeReference { /** * The id of the last snapshot in the src tree when this WithName node was - * generated. When calculating the quota usage of the referred node, only + * generated, i.e. this reference is in that snapshot. + *

+ * When calculating the quota usage of the referred node, only * the files/dirs existing when this snapshot was taken will be counted for * this WithName node and propagated along its ancestor path. */ @@ -564,6 +582,11 @@ public WithName(INodeDirectory parent, WithCount referred, byte[] name, INodeReferenceValidation.add(this, WithName.class); } + String getNameDetails() { + return getClass().getSimpleName() + "[" + getLocalName() + + ", lastSnapshot=" + lastSnapshotId + "]"; + } + @Override void assertReferences() { final INode ref= getReferredINode(); @@ -673,7 +696,7 @@ public void cleanSubtree(ReclaimContext reclaimContext, final int snapshot, reclaimContext.quotaDelta().setCounts(old); } } - + @Override public void destroyAndCollectBlocks(ReclaimContext reclaimContext) { int snapshot = getSelfSnapshot(); @@ -727,7 +750,8 @@ private int getSelfSnapshot() { public static class DstReference extends INodeReference { /** - * Record the latest snapshot of the dst subtree before the rename. For + * Record the latest snapshot of the dst subtree before the rename, + * i.e. this reference is NOT in that snapshot. For * later operations on the moved/renamed files/directories, if the latest * snapshot is after this dstSnapshot, changes will be recorded to the * latest snapshot. Otherwise changes will be recorded to the snapshot @@ -752,6 +776,11 @@ public DstReference(INodeDirectory parent, WithCount referred, INodeReferenceValidation.add(this, DstReference.class); } + String getDstDetails() { + return getClass().getSimpleName() + "[" + getLocalName() + + ", dstSnapshot=" + dstSnapshotId + "]"; + } + @Override void assertReferences() { final INode ref = getReferredINode(); @@ -794,7 +823,27 @@ public void cleanSubtree(ReclaimContext reclaimContext, int snapshot, getReferredINode().cleanSubtree(reclaimContext, snapshot, prior); } } - + + /** + * When dstSnapshotId >= snapshotToBeDeleted, + * this reference is not in snapshotToBeDeleted. + * This reference should not be destroyed. + * + * @param context to {@link ReclaimContext#getSnapshotIdToBeDeleted()} + */ + private boolean shouldDestroy(ReclaimContext context) { + final int snapshotToBeDeleted = context.getSnapshotIdToBeDeleted(); + if (dstSnapshotId < snapshotToBeDeleted) { + return true; + } + LOG.warn("Try to destroy a DstReference with dstSnapshotId = {}" + + " >= snapshotToBeDeleted = {}", dstSnapshotId, snapshotToBeDeleted); + LOG.warn(" dstRef: {}", toDetailString()); + final INode r = getReferredINode().asReference().getReferredINode(); + LOG.warn(" referred: {}", r.toDetailString()); + return false; + } + /** * {@inheritDoc} *
@@ -808,6 +857,10 @@ public void cleanSubtree(ReclaimContext reclaimContext, int snapshot, */ @Override public void destroyAndCollectBlocks(ReclaimContext reclaimContext) { + if (!shouldDestroy(reclaimContext)) { + return; + } + // since we count everything of the subtree for the quota usage of a // dst reference node, here we should just simply do a quota computation. // then to avoid double counting, we pass a different QuotaDelta to other diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java index c181c05d081..983f9d28d17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java @@ -266,6 +266,8 @@ public Snapshot removeSnapshot( final Snapshot snapshot = snapshotsByNames.get(i); int prior = Snapshot.findLatestSnapshot(snapshotRoot, snapshot.getId()); snapshotManager.assertPrior(snapshotRoot, snapshotName, prior); + + reclaimContext.setSnapshotToBeDeleted(snapshot); snapshotRoot.cleanSubtree(reclaimContext, snapshot.getId(), prior); // remove from snapshotsByNames after successfully cleaning the subtree snapshotsByNames.remove(i); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java index 492278391d5..d6efbf9718b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java @@ -122,9 +122,12 @@ public String getDetailedString() { public void cleanFile(INode.ReclaimContext reclaimContext, final INodeFile file, final int snapshotId, int priorSnapshotId, byte storagePolicyId) { + final int snapshotToBeDeleted + = reclaimContext.getSnapshotIdToBeDeleted(snapshotId, file); if (snapshotId == Snapshot.CURRENT_STATE_ID) { // delete the current file while the file has snapshot feature - if (!isCurrentFileDeleted()) { + if (!isCurrentFileDeleted() + && snapshotToBeDeleted == Snapshot.CURRENT_STATE_ID) { file.recordModification(priorSnapshotId); deleteCurrentFile(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java index 095bbce0731..9d4b8a7997e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java @@ -94,6 +94,12 @@ public static int getSnapshotId(Snapshot s) { return s == null ? CURRENT_STATE_ID : s.getId(); } + public static String getSnapshotString(int snapshot) { + return snapshot == CURRENT_STATE_ID? "" + : snapshot == NO_SNAPSHOT_ID? "" + : "Snapshot #" + snapshot; + } + /** * Compare snapshot with IDs, where null indicates the current status thus * is greater than any non-null snapshot. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsImageValidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsImageValidation.java index af30f1acde4..029e11e806e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsImageValidation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsImageValidation.java @@ -53,8 +53,8 @@ public void testValidation() throws Exception { final int errorCount = FsImageValidation.newInstance().run(); Assert.assertEquals("Error Count: " + errorCount, 0, errorCount); } catch (HadoopIllegalArgumentException e) { - LOG.warn("The environment variable {} is not set: {}", - FsImageValidation.FS_IMAGE, e); + LOG.warn("The environment variable " + FsImageValidation.FS_IMAGE + + " is not set", e); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java index d57a7344fe0..e526f3ec4ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.hdfs.server.namenode.snapshot; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrashPolicyDefault; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -42,12 +45,19 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.GSet; +import org.apache.log4j.Appender; +import org.apache.log4j.Layout; +import org.apache.log4j.LogManager; +import org.apache.log4j.PatternLayout; +import org.apache.log4j.WriterAppender; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -61,18 +71,18 @@ public class SnapshotTestHelper { /** Disable the logs that are not very useful for snapshot related tests. */ public static void disableLogs() { final String[] lognames = { + "org.eclipse.jetty", + "org.apache.hadoop.ipc", + "org.apache.hadoop.net", + "org.apache.hadoop.security", + + "org.apache.hadoop.hdfs.server.blockmanagement", "org.apache.hadoop.hdfs.server.common.Util", - "org.apache.hadoop.hdfs.server.blockmanagement.BlockReportLeaseManager", + "org.apache.hadoop.hdfs.server.datanode", "org.apache.hadoop.hdfs.server.namenode.FileJournalManager", "org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager", "org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf", "org.apache.hadoop.hdfs.server.namenode.FSEditLog", - "org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner", - "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice", - "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl", - "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService", - "org.apache.hadoop.hdfs.server.datanode.fsdataset.impl" + - ".RamDiskAsyncLazyPersistService", }; for(String n : lognames) { GenericTestUtils.disableLog(LoggerFactory.getLogger(n)); @@ -105,6 +115,198 @@ public static void disableLogs() { GenericTestUtils.disableLog(Server.LOG); } + static class MyCluster { + private final MiniDFSCluster cluster; + private final FSNamesystem fsn; + private final FSDirectory fsdir; + private final DistributedFileSystem hdfs; + private final FsShell shell = new FsShell(); + + private final Path snapshotDir = new Path("/"); + private final AtomicInteger snapshotCount = new AtomicInteger(); + private final AtomicInteger trashMoveCount = new AtomicInteger(); + private final AtomicInteger printTreeCount = new AtomicInteger(); + private final AtomicBoolean printTree = new AtomicBoolean(); + + MyCluster(Configuration conf) throws Exception { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .format(true) + .build(); + fsn = cluster.getNamesystem(); + fsdir = fsn.getFSDirectory(); + + cluster.waitActive(); + hdfs = cluster.getFileSystem(); + hdfs.allowSnapshot(snapshotDir); + createSnapshot(); + + shell.setConf(cluster.getConfiguration(0)); + runShell("-mkdir", "-p", ".Trash"); + } + + void setPrintTree(boolean print) { + printTree.set(print); + } + + boolean getPrintTree() { + return printTree.get(); + } + + Path getTrashPath(Path p) throws Exception { + final Path trash = hdfs.getTrashRoot(p); + final Path resolved = hdfs.resolvePath(p); + return new Path(trash, "Current/" + resolved.toUri().getPath()); + } + + int runShell(String... argv) { + return shell.run(argv); + } + + String createSnapshot() + throws Exception { + final String name = "s" + snapshotCount.getAndIncrement(); + SnapshotTestHelper.createSnapshot(hdfs, snapshotDir, name); + return name; + } + + void deleteSnapshot(String snapshotName) throws Exception { + LOG.info("Before delete snapshot " + snapshotName); + hdfs.deleteSnapshot(snapshotDir, snapshotName); + } + + boolean assertExists(Path path) throws Exception { + if (path == null) { + return false; + } + if (!hdfs.exists(path)) { + final String err = "Path not found: " + path; + printFs(err); + throw new AssertionError(err); + } + return true; + } + + void printFs(String label) { + final PrintStream out = System.out; + out.println(); + out.println(); + out.println("XXX " + printTreeCount.getAndIncrement() + ": " + label); + if (printTree.get()) { + fsdir.getRoot().dumpTreeRecursively(out); + } + } + + void shutdown() { + LOG.info("snapshotCount: {}", snapshotCount); + cluster.shutdown(); + } + + Path mkdirs(String dir) throws Exception { + return mkdirs(new Path(dir)); + } + + Path mkdirs(Path dir) throws Exception { + final String label = "mkdirs " + dir; + LOG.info(label); + hdfs.mkdirs(dir); + Assert.assertTrue(label, hdfs.exists(dir)); + return dir; + } + + Path createFile(String file) throws Exception { + return createFile(new Path(file)); + } + + Path createFile(Path file) throws Exception { + final String label = "createFile " + file; + LOG.info(label); + DFSTestUtil.createFile(hdfs, file, 0, (short)1, 0L); + Assert.assertTrue(label, hdfs.exists(file)); + return file; + } + + String rename(Path src, Path dst) throws Exception { + assertExists(src); + final String snapshot = createSnapshot(); + + final String label = "rename " + src + " -> " + dst; + final boolean renamed = hdfs.rename(src, dst); + LOG.info("{}: success? {}", label, renamed); + Assert.assertTrue(label, renamed); + return snapshot; + } + + Path moveToTrash(Path path, boolean printFs) throws Exception { + return moveToTrash(path.toString(), printFs); + } + + Path moveToTrash(String path, boolean printFs) throws Exception { + final Log4jRecorder recorder = Log4jRecorder.record( + LoggerFactory.getLogger(TrashPolicyDefault.class)); + runShell("-rm", "-r", path); + final String label = "moveToTrash-" + trashMoveCount.getAndIncrement() + " " + path; + if (printFs) { + printFs(label); + } else { + LOG.info(label); + } + final String recorded = recorder.getRecorded(); + LOG.info("Recorded: {}", recorded); + + final String pattern = " to trash at: "; + final int i = recorded.indexOf(pattern); + if (i > 0) { + final String sub = recorded.substring(i + pattern.length()); + return new Path(sub.trim()); + } + return null; + } + } + + /** Records log messages from a Log4j logger. */ + public static final class Log4jRecorder { + static Log4jRecorder record(org.slf4j.Logger logger) { + return new Log4jRecorder(toLog4j(logger), getLayout()); + } + + static org.apache.log4j.Logger toLog4j(org.slf4j.Logger logger) { + return LogManager.getLogger(logger.getName()); + } + + static Layout getLayout() { + final org.apache.log4j.Logger root + = org.apache.log4j.Logger.getRootLogger(); + Appender a = root.getAppender("stdout"); + if (a == null) { + a = root.getAppender("console"); + } + return a == null? new PatternLayout() : a.getLayout(); + } + + private final StringWriter stringWriter = new StringWriter(); + private final WriterAppender appender; + private final org.apache.log4j.Logger logger; + + private Log4jRecorder(org.apache.log4j.Logger logger, Layout layout) { + this.appender = new WriterAppender(layout, stringWriter); + this.logger = logger; + this.logger.addAppender(this.appender); + } + + public String getRecorded() { + return stringWriter.toString(); + } + + public void stop() { + logger.removeAppender(appender); + } + + public void clear() { + stringWriter.getBuffer().setLength(0); + } + } + private SnapshotTestHelper() { // Cannot be instantinatied } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestFsShellMoveToTrashWithSnapshots.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestFsShellMoveToTrashWithSnapshots.java new file mode 100644 index 00000000000..fe2dc6649a8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestFsShellMoveToTrashWithSnapshots.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.snapshot; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +/** + * Testing snapshots with FsShell move-to-trash feature. + */ +public class TestFsShellMoveToTrashWithSnapshots { + static { + SnapshotTestHelper.disableLogs(); + } + + private static final Logger LOG = + LoggerFactory.getLogger("XXX"); + + private static final String TMP = ".tmp"; + private static final String WAREHOUSE_DIR = "/warehouse/sub/"; + private static final String TO_BE_REMOVED = "TMP/"; + + private static SnapshotTestHelper.MyCluster cluster; + + @Before + public void setUp() throws Exception { + final Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 100); + cluster = new SnapshotTestHelper.MyCluster(conf); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + static class MyDirs { + private final Path base; + private final boolean[] moved; + private final List renames = new ArrayList<>(); + + MyDirs(Path base, int depth) { + this.base = base; + this.moved = new boolean[depth]; + + for (int i = 0; i < depth; i++) { + renames.add(i); + } + Collections.shuffle(renames); + } + + int depth() { + return moved.length; + } + + DeleteSnapshotOp rename() throws Exception { + final int i = renames.remove(renames.size() - 1); + final String snapshot = cluster.rename(getSubPath(i + 1), getSubPath(i)); + moved[i] = true; + return new DeleteSnapshotOp(snapshot); + } + + Path getSubPath(int n) { + if (n == 0) { + return base; + } + final StringBuilder b = new StringBuilder(); + for (int i = 0; i < n; i++) { + if (!moved[i]) { + b.append(TO_BE_REMOVED); + } + b.append("dir").append(i).append("/"); + } + return new Path(base, b.toString()); + } + + Path getPath() { + return getSubPath(moved.length); + } + } + + static class MyFile { + private final Path tmp; + private Path dst; + private Path trash; + + MyFile(String filePath) { + this.tmp = new Path(filePath + TMP); + } + + @Override + public String toString() { + return "MyFile{" + + "tmp=" + tmp + + ", dst=" + dst + + ", trash=" + trash + + '}'; + } + + synchronized Path getPath() { + return trash != null ? trash + : dst != null ? dst + : tmp; + } + + synchronized String moveFromTmp2Dst(Path dstDir) throws Exception { + final String tmpName = tmp.getName(); + dst = new Path(dstDir, tmpName.substring(0, tmpName.length() - 4)); + final String snapshot = cluster.rename(tmp, dst); + trash = cluster.getTrashPath(dst); + return snapshot; + } + } + + MyFile createTmp(String filePath) throws Exception { + final MyFile f = new MyFile(filePath); + cluster.createFile(f.tmp); + return f; + } + + DeleteSnapshotOp moveFromTmp2Dst(MyFile file, Path dstDir) throws Exception { + final String snapshot = file.moveFromTmp2Dst(dstDir); + return new DeleteSnapshotOp(snapshot); + } + + List runTestMoveToTrashWithShell( + Path dbDir, Path tmpDir, int numFiles) + throws Exception { + return runTestMoveToTrashWithShell(dbDir, tmpDir, numFiles, 4, null); + } + + List runTestMoveToTrashWithShell( + Path dbDir, Path tmpDir, int numFiles, int depth, Integer randomSleepMaxMs) + throws Exception { + LOG.info("dbDir={}", dbDir); + LOG.info("tmpDir={}", tmpDir); + LOG.info("numFiles={}, depth={}, randomSleepMaxMs={}", numFiles, depth, randomSleepMaxMs); + cluster.setPrintTree(numFiles < 10); + + final List ops = new ArrayList<>(); + createSnapshot(ops); + + //swap sub1 and sub2 + Path sub1 = cluster.mkdirs(new Path(dbDir, "sub1")); + Path sub2 = cluster.mkdirs(new Path(sub1, "sub2")); + + ops.add(new DeleteSnapshotOp(cluster.rename(sub2, dbDir))); + sub2 = new Path(dbDir, "sub2"); + ops.add(new DeleteSnapshotOp(cluster.rename(sub1, sub2))); + sub1 = new Path(sub2, "sub1"); + + final MyDirs dirs = new MyDirs(sub1, depth); + cluster.mkdirs(dirs.getPath()); + final List buckets = new ArrayList<>(); + + for (int i = 0; i < dirs.depth() / 2; i++) { + ops.add(dirs.rename()); + } + final int offset = numFiles / 4; + for (int i = 0; i < numFiles; i++) { + final String bucket = tmpDir + String.format("/bucket_%04d", i); + createSnapshot(ops); + buckets.add(createTmp(bucket)); + if (i >= offset) { + final int j = i - offset; + ops.add(moveFromTmp2Dst(buckets.get(j), dirs.getPath())); + } + if (randomSleepMaxMs != null) { + Thread.sleep(ThreadLocalRandom.current().nextInt(randomSleepMaxMs)); + } + } + + for (int i = dirs.depth() / 2; i < dirs.depth(); i++) { + ops.add(dirs.rename()); + } + + ops.add(new DeleteSnapshotOp(cluster.rename(dirs.getSubPath(1), sub2))); + ops.add(new DeleteSnapshotOp(cluster.rename(sub1, dbDir))); + sub1 = new Path(dbDir, "sub1"); + ops.add(new DeleteSnapshotOp(cluster.rename(sub2, sub1))); + sub2 = new Path(sub1, "sub2"); + ops.add(new DeleteSnapshotOp(cluster.rename(sub2, new Path(sub1, "sub1")))); + ops.add(new DeleteSnapshotOp(cluster.rename(sub1, new Path(dbDir, "sub2")))); + + final MoveToTrashOp m = new MoveToTrashOp(dbDir); + m.trashPath.thenAccept(p -> updateTrashPath(p, buckets)); + ops.add(m); + + LOG.info("ops count: {}", ops.size()); + while (!ops.isEmpty()) { + runOneOp(ops); + } + cluster.printFs("END"); + return buckets; + } + + static Path removeSubstring(Path p) { + if (p == null) { + return null; + } + return new Path(p.toUri().getPath().replace(TO_BE_REMOVED, "")); + } + + void updateTrashPath(String trashPathPrefix, List files) { + final String commonPrefix; + final int j = trashPathPrefix.lastIndexOf('/'); + commonPrefix = trashPathPrefix.substring(0, j + 1); + + for (MyFile f : files) { + final String original = f.trash.toUri().getPath(); + if (!original.startsWith(trashPathPrefix)) { + Assert.assertTrue(original.startsWith(commonPrefix)); + + final int i = original.indexOf('/', commonPrefix.length()); + final String suffix = original.substring(i + 1); + f.trash = new Path(trashPathPrefix, suffix); + } + } + } + + @Test(timeout = 300_000) + public void test100tasks20files() throws Exception { + runMultipleTasks(100, 20); + } + + @Test(timeout = 300_000) + public void test10tasks200files() throws Exception { + runMultipleTasks(10, 200); + } + + void runMultipleTasks(int numTasks, int filesPerTask) throws Exception { + final List>> futures = new ArrayList<>(); + final List buckets = new ArrayList<>(); + + final ExecutorService executor = Executors.newFixedThreadPool(10); + try { + for (int i = 0; i < numTasks; i++) { + final String db = "db" + i; + final String tmp = "tmp" + i; + futures.add(executor.submit(() -> { + final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + db); + final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + tmp); + return runTestMoveToTrashWithShell(dbDir, tmpDir, filesPerTask, 4, 100); + })); + } + + for (Future> f : futures) { + buckets.addAll(f.get()); + } + } finally { + executor.shutdown(); + } + assertExists(buckets, f -> removeSubstring(f.getPath())); + } + + @Test(timeout = 100_000) + public void test4files() throws Exception { + final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db"); + final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp"); + final List buckets = runTestMoveToTrashWithShell( + dbDir, tmpDir, 4, 2, null); + assertExists(buckets, f -> removeSubstring(f.getPath())); + } + + @Test(timeout = 300_000) + public void test200files() throws Exception { + final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db"); + final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp"); + final List buckets = runTestMoveToTrashWithShell( + dbDir, tmpDir, 200); + assertExists(buckets, f -> removeSubstring(f.getPath())); + } + + @Test(timeout = 300_000) + public void test50files10times() throws Exception { + final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp"); + final List buckets = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db"); + buckets.addAll(runTestMoveToTrashWithShell(dbDir, tmpDir, 50)); + } + cluster.setPrintTree(true); + cluster.printFs("test_10files_10times"); + assertExists(buckets, f -> removeSubstring(f.getPath())); + } + + static void createSnapshot(List ops) throws Exception { + if (ThreadLocalRandom.current().nextBoolean()) { + ops.add(new DeleteSnapshotOp(cluster.createSnapshot())); + } + } + + void runOneOp(List ops) throws Exception { + Collections.shuffle(ops); + + final Op op = ops.remove(ops.size() - 1); + if (op instanceof MoveToTrashOp) { + createSnapshot(ops); + } + op.execute(); + } + + static abstract class Op { + private final AtomicBoolean executed = new AtomicBoolean(); + + final void execute() throws Exception { + if (executed.compareAndSet(false, true)) { + executeImpl(); + } + } + + final boolean isExecuted() { + return executed.get(); + } + + abstract void executeImpl() throws Exception; + } + + static class MoveToTrashOp extends Op { + private final Path path; + private final CompletableFuture trashPath = new CompletableFuture<>(); + + MoveToTrashOp(Path path) { + this.path = path; + } + + @Override + public void executeImpl() throws Exception { + final Path p = cluster.moveToTrash(path, true); + LOG.info("MoveToTrash: {} -> {}", path, p); + trashPath.complete(p.toUri().getPath()); + } + } + + static class DeleteSnapshotOp extends Op { + private final String name; + + DeleteSnapshotOp(String name) { + this.name = name; + } + + @Override + void executeImpl() throws Exception { + cluster.deleteSnapshot(name); + } + } + + void assertExists(List files, Function getPath) + throws Exception { + for (MyFile f : files) { + final Path p = getPath.apply(f); + final boolean exists = cluster.assertExists(p); + if (cluster.getPrintTree()) { + LOG.info("{} exists? {}, {}", p, exists, f); + } + } + } +} \ No newline at end of file