diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index 94426a561fb..7a8bf8dbe0d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -47,6 +47,7 @@ function hadoop_usage hadoop_add_subcommand "ec" admin "run a HDFS ErasureCoding CLI" hadoop_add_subcommand "fetchdt" client "fetch a delegation token from the NameNode" hadoop_add_subcommand "fsck" admin "run a DFS filesystem checking utility" + hadoop_add_subcommand "fsImageValidation" admin "run FsImageValidation to check an fsimage" hadoop_add_subcommand "getconf" client "get config values from configuration" hadoop_add_subcommand "groups" client "get the groups which users belong to" hadoop_add_subcommand "haadmin" admin "run a DFS HA admin client" @@ -143,6 +144,9 @@ function hdfscmd_case fsck) HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.DFSck ;; + fsImageValidation) + HADOOP_CLASSNAME=org.apache.hadoop.hdfs.server.namenode.FsImageValidation + ;; getconf) HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.GetConf ;; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd index a9a7852fa61..23d6a5aa1c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd @@ -59,7 +59,7 @@ if "%1" == "--loglevel" ( ) ) - set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto dfsrouter dfsrouteradmin debug + set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck fsImageValidation balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto dfsrouter dfsrouteradmin debug for %%i in ( %hdfscommands% ) do ( if %hdfs-command% == %%i set hdfscommand=true ) @@ -121,6 +121,11 @@ goto :eof set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_CLIENT_OPTS% goto :eof +:fsImageValidation + set CLASS=org.apache.hadoop.hdfs.server.namenode.FsImageValidation + set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_CLIENT_OPTS% + goto :eof + :balancer set CLASS=org.apache.hadoop.hdfs.server.balancer.Balancer set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_BALANCER_OPTS% @@ -236,6 +241,7 @@ goto :eof @echo dfsadmin run a DFS admin client @echo haadmin run a DFS HA admin client @echo fsck run a DFS filesystem checking utility + @echo fsImageValidation run FsImageValidation to check an fsimage @echo balancer run a cluster balancing utility @echo jmxget get JMX exported values from NameNode or DataNode. @echo oiv apply the offline fsimage viewer to an fsimage diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java index a4377cddcde..8f324fbcc03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java @@ -209,4 +209,9 @@ public abstract class EditLogInputStream implements Closeable { * even faster data source (e.g. a byte buffer). */ public abstract boolean isLocalLog(); + + @Override + public String toString() { + return getName(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 993c2832dce..fe39b071e20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1179,7 +1179,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return Collections.unmodifiableList(auditLoggers); } - private void loadFSImage(StartupOption startOpt) throws IOException { + void loadFSImage(StartupOption startOpt) throws IOException { final FSImage fsImage = getFSImage(); // format before starting up if requested diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java new file mode 100644 index 00000000000..5dcb5069e87 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.util.GSet; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Arrays; +import java.util.Timer; +import java.util.TimerTask; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY; +import static org.apache.hadoop.util.Time.now; + +/** + * For validating {@link FSImage}. + * This tool will load the user specified {@link FSImage}, + * build the namespace tree, + * and then run validations over the namespace tree. + * + * The main difference of this tool and + * {@link org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer} + * is that + * {@link org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer} + * only loads {@link FSImage} but it does not build the namespace tree. + * Therefore, running validations over the namespace tree is impossible in + * {@link org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer}. + */ +public class FsImageValidation { + static final Logger LOG = LoggerFactory.getLogger(FsImageValidation.class); + + static final String FS_IMAGE = "FS_IMAGE"; + + static FsImageValidation newInstance(String... args) { + final String f = Cli.parse(args); + if (f == null) { + throw new HadoopIllegalArgumentException( + FS_IMAGE + " is not specified."); + } + return new FsImageValidation(new File(f)); + } + + static void initConf(Configuration conf) { + final int aDay = 24*3600_000; + conf.setInt(DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, aDay); + conf.setInt(DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, aDay); + conf.setBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false); + } + + /** Set (fake) HA so that edit logs will not be loaded. */ + static void setHaConf(String nsId, Configuration conf) { + conf.set(DFSConfigKeys.DFS_NAMESERVICES, nsId); + final String haNNKey = DFS_HA_NAMENODES_KEY_PREFIX + "." + nsId; + conf.set(haNNKey, "nn0,nn1"); + final String rpcKey = DFS_NAMENODE_RPC_ADDRESS_KEY + "." + nsId + "."; + conf.set(rpcKey + "nn0", "127.0.0.1:8080"); + conf.set(rpcKey + "nn1", "127.0.0.1:8080"); + } + + static void initLogLevels() { + Util.setLogLevel(FSImage.class, Level.TRACE); + Util.setLogLevel(FileJournalManager.class, Level.TRACE); + + Util.setLogLevel(GSet.class, Level.OFF); + Util.setLogLevel(BlockManager.class, Level.OFF); + Util.setLogLevel(DatanodeManager.class, Level.OFF); + Util.setLogLevel(TopMetrics.class, Level.OFF); + } + + static class Util { + static String memoryInfo() { + final Runtime runtime = Runtime.getRuntime(); + return "Memory Info: free=" + StringUtils.byteDesc(runtime.freeMemory()) + + ", total=" + StringUtils.byteDesc(runtime.totalMemory()) + + ", max=" + StringUtils.byteDesc(runtime.maxMemory()); + } + + static void setLogLevel(Class clazz, Level level) { + final Log log = LogFactory.getLog(clazz); + if (log instanceof Log4JLogger) { + final org.apache.log4j.Logger logger = ((Log4JLogger) log).getLogger(); + logger.setLevel(level); + LOG.info("setLogLevel {} to {}, getEffectiveLevel() = {}", + clazz.getName(), level, logger.getEffectiveLevel()); + } else { + LOG.warn("Failed setLogLevel {} to {}", clazz.getName(), level); + } + } + + static String toCommaSeparatedNumber(long n) { + final StringBuilder b = new StringBuilder(); + for(; n > 999;) { + b.insert(0, String.format(",%03d", n%1000)); + n /= 1000; + } + return b.insert(0, n).toString(); + } + } + + private final File fsImageFile; + + FsImageValidation(File fsImageFile) { + this.fsImageFile = fsImageFile; + } + + int checkINodeReference(Configuration conf) throws Exception { + LOG.info(Util.memoryInfo()); + initConf(conf); + + final TimerTask checkProgress = new TimerTask() { + @Override + public void run() { + final double percent = NameNode.getStartupProgress().createView() + .getPercentComplete(Phase.LOADING_FSIMAGE); + LOG.info(String.format("%s Progress: %.1f%%", + Phase.LOADING_FSIMAGE, 100*percent)); + } + }; + + INodeReferenceValidation.start(); + final Timer t = new Timer(); + t.scheduleAtFixedRate(checkProgress, 0, 60_000); + final long loadStart = now(); + final FSNamesystem namesystem; + if (fsImageFile.isDirectory()) { + Cli.println("Loading %s as a directory.", fsImageFile); + final String dir = fsImageFile.getCanonicalPath(); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, dir); + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, dir); + + + final FSImage fsImage = new FSImage(conf); + namesystem = new FSNamesystem(conf, fsImage, true); + // Avoid saving fsimage + namesystem.setRollingUpgradeInfo(false, 0); + + namesystem.loadFSImage(HdfsServerConstants.StartupOption.REGULAR); + } else { + Cli.println("Loading %s as a file.", fsImageFile); + final FSImage fsImage = new FSImage(conf); + namesystem = new FSNamesystem(conf, fsImage, true); + + final NamespaceInfo namespaceInfo = NNStorage.newNamespaceInfo(); + namespaceInfo.clusterID = "cluster0"; + fsImage.getStorage().setStorageInfo(namespaceInfo); + + final FSImageFormat.LoaderDelegator loader + = FSImageFormat.newLoader(conf, namesystem); + namesystem.writeLock(); + namesystem.getFSDirectory().writeLock(); + try { + loader.load(fsImageFile, false); + } finally { + namesystem.getFSDirectory().writeUnlock(); + namesystem.writeUnlock(); + } + } + t.cancel(); + Cli.println("Loaded %s %s successfully in %s", + FS_IMAGE, fsImageFile, StringUtils.formatTime(now() - loadStart)); + LOG.info(Util.memoryInfo()); + final int errorCount = INodeReferenceValidation.end(); + LOG.info(Util.memoryInfo()); + return errorCount; + } + + static class Cli extends Configured implements Tool { + static final String COMMAND; + static final String USAGE; + static { + final String clazz = FsImageValidation.class.getSimpleName(); + COMMAND = Character.toLowerCase(clazz.charAt(0)) + clazz.substring(1); + USAGE = "Usage: hdfs " + COMMAND + " <" + FS_IMAGE + ">"; + } + + @Override + public int run(String[] args) throws Exception { + initLogLevels(); + + final FsImageValidation validation = FsImageValidation.newInstance(args); + final int errorCount = validation.checkINodeReference(getConf()); + println("Error Count: %s", errorCount); + return errorCount == 0? 0: 1; + } + + static String parse(String... args) { + final String f; + if (args == null || args.length == 0) { + f = System.getenv().get(FS_IMAGE); + if (f != null) { + println("Environment variable %s = %s", FS_IMAGE, f); + } + } else if (args.length == 1) { + f = args[0]; + } else { + throw new HadoopIllegalArgumentException( + "args = " + Arrays.toString(args)); + } + + println("%s = %s", FS_IMAGE, f); + return f; + } + + static void println(String format, Object... args) { + final String s = String.format(format, args); + System.out.println(s); + LOG.info(s); + } + + static void printError(String message, Throwable t) { + System.out.println(message); + if (t != null) { + t.printStackTrace(System.out); + } + LOG.error(message, t); + } + } + + public static void main(String[] args) { + if (DFSUtil.parseHelpArgument(args, Cli.USAGE, System.out, true)) { + System.exit(0); + } + + try { + System.exit(ToolRunner.run(new Configuration(), new Cli(), args)); + } catch (HadoopIllegalArgumentException e) { + e.printStackTrace(System.err); + System.err.println(Cli.USAGE); + System.exit(-1); + ToolRunner.printGenericCommandUsage(System.err); + } catch (Throwable e) { + Cli.printError("Failed to run " + Cli.COMMAND, e); + System.exit(-2); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index 6334ed2f155..fd416f72a3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -660,9 +660,15 @@ public abstract class INode implements INodeAttributes, Diff.Element { } } + @VisibleForTesting + public String getFullPathAndObjectString() { + return getFullPathName() + "(" + getId() + ", " + getObjectString() + ")"; + } + @VisibleForTesting public String toDetailString() { - return toString() + "(" + getObjectString() + "), " + getParentString(); + return toString() + "(" + getId() + ", " + getObjectString() + + ", " + getParentString() + ")"; } /** @return the parent directory */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java index 8de0ed6d5de..69a92706ab5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java @@ -21,6 +21,7 @@ import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.fs.permission.FsPermission; @@ -61,6 +62,16 @@ import org.apache.hadoop.security.AccessControlException; * inode(id=1000,name=bar).getParent() returns /xyz but not /abc. */ public abstract class INodeReference extends INode { + /** Assert the relationship this node and the references. */ + abstract void assertReferences(); + + @Override + public String toDetailString() { + final String s = referred == null? null + : referred.getFullPathAndObjectString(); + return super.toDetailString() + ", ->" + s; + } + /** * Try to remove the given reference and then return the reference count. * If the given inode is not a reference, return -1; @@ -346,7 +357,7 @@ public abstract class INodeReference extends INode { out.print(", dstSnapshotId=" + ((DstReference) this).dstSnapshotId); } if (this instanceof WithCount) { - out.print(", count=" + ((WithCount)this).getReferenceCount()); + out.print(", " + ((WithCount)this).getCountDetails()); } out.println(); @@ -382,7 +393,59 @@ public abstract class INodeReference extends INode { public WithCount(INodeReference parent, INode referred) { super(parent, referred); Preconditions.checkArgument(!referred.isReference()); + Preconditions.checkArgument(parent == null); referred.setParentReference(this); + + INodeReferenceValidation.add(this, WithCount.class); + } + + private String getCountDetails() { + final StringBuilder b = new StringBuilder("["); + if (!withNameList.isEmpty()) { + final Iterator i = withNameList.iterator(); + b.append(i.next().getFullPathAndObjectString()); + for(; i.hasNext();) { + b.append(", ").append(i.next().getFullPathAndObjectString()); + } + } + b.append("]"); + return ", count=" + getReferenceCount() + ", names=" + b; + } + + @Override + public String toDetailString() { + return super.toDetailString() + getCountDetails(); + } + + private void assertDstReference(INodeReference parentRef) { + if (parentRef instanceof DstReference) { + return; + } + throw new IllegalArgumentException("Unexpected non-DstReference:" + + "\n parentRef: " + parentRef.toDetailString() + + "\n withCount: " + this.toDetailString()); + } + + private void assertReferredINode(INodeReference ref, String name) { + if (ref.getReferredINode() == this) { + return; + } + throw new IllegalStateException("Inconsistent Reference:" + + "\n " + name + ": " + ref.toDetailString() + + "\n withCount: " + this.toDetailString()); + } + + @Override + void assertReferences() { + for(WithName withName : withNameList) { + assertReferredINode(withName, " withName"); + } + + final INodeReference parentRef = getParentReference(); + if (parentRef != null) { + assertDstReference(parentRef); + assertReferredINode(parentRef, "parentRef"); + } } public int getReferenceCount() { @@ -406,16 +469,26 @@ public abstract class INodeReference extends INode { } } + private int search(WithName ref) { + return Collections.binarySearch(withNameList, ref, WITHNAME_COMPARATOR); + } + /** Decrement and then return the reference count. */ public void removeReference(INodeReference ref) { if (ref instanceof WithName) { - int i = Collections.binarySearch(withNameList, (WithName) ref, - WITHNAME_COMPARATOR); + final WithName withName = (WithName) ref; + final int i = search(withName); if (i >= 0) { withNameList.remove(i); + INodeReferenceValidation.remove(withName, WithName.class); } } else if (ref == getParentReference()) { setParent(null); + INodeReferenceValidation.remove((DstReference) ref, DstReference.class); + } + + if (getReferenceCount() == 0) { + INodeReferenceValidation.remove(this, WithCount.class); } } @@ -481,6 +554,33 @@ public abstract class INodeReference extends INode { this.name = name; this.lastSnapshotId = lastSnapshotId; referred.addReference(this); + + INodeReferenceValidation.add(this, WithName.class); + } + + @Override + void assertReferences() { + final INode ref= getReferredINode(); + final String err; + if (ref instanceof WithCount) { + final WithCount withCount = (WithCount)ref; + final int i = withCount.search(this); + if (i >= 0) { + if (withCount.withNameList.get(i) == this) { + return; + } else { + err = "OBJECT MISMATCH, withNameList.get(" + i + ") != this"; + } + } else { + err = "NOT FOUND in withNameList"; + } + } else { + err = "UNEXPECTED CLASS, expecting WithCount"; + } + + throw new IllegalStateException(err + ":" + + "\n ref: " + (ref == null? null : ref.toDetailString()) + + "\n this: " + this.toDetailString()); } @Override @@ -642,6 +742,27 @@ public abstract class INodeReference extends INode { super(parent, referred); this.dstSnapshotId = dstSnapshotId; referred.addReference(this); + + INodeReferenceValidation.add(this, DstReference.class); + } + + @Override + void assertReferences() { + final INode ref = getReferredINode(); + final String err; + if (ref instanceof WithCount) { + if (ref.getParentReference() == this) { + return; + } else { + err = "OBJECT MISMATCH, ref.getParentReference() != this"; + } + } else { + err = "UNEXPECTED CLASS, expecting WithCount"; + } + + throw new IllegalStateException(err + ":" + + "\n ref: " + (ref == null? null : ref.toDetailString()) + + "\n this: " + this.toDetailString()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReferenceValidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReferenceValidation.java new file mode 100644 index 00000000000..d3faf430741 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReferenceValidation.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Cli.*; + +/** For validating {@link INodeReference} subclasses. */ +public class INodeReferenceValidation { + public static final Logger LOG = LoggerFactory.getLogger( + INodeReferenceValidation.class); + + private static final AtomicReference INSTANCE + = new AtomicReference<>(); + + public static void start() { + INSTANCE.compareAndSet(null, new INodeReferenceValidation()); + println("Validation started"); + } + + public static int end() { + final INodeReferenceValidation instance = INSTANCE.getAndSet(null); + if (instance == null) { + return 0; + } + + final int errorCount = instance.assertReferences(); + println("Validation ended successfully: %d error(s) found.", errorCount); + return errorCount; + } + + static void add(REF ref, Class clazz) { + final INodeReferenceValidation validation = INSTANCE.get(); + if (validation != null) { + final boolean added = validation.getReferences(clazz).add(ref); + Preconditions.checkState(added); + LOG.trace("add {}: {}", clazz, ref.toDetailString()); + } + } + + static void remove(REF ref, Class clazz) { + final INodeReferenceValidation validation = INSTANCE.get(); + if (validation != null) { + final boolean removed = validation.getReferences(clazz).remove(ref); + Preconditions.checkState(removed); + LOG.trace("remove {}: {}", clazz, ref.toDetailString()); + } + } + + static class ReferenceSet { + private final Class clazz; + private final List references = new LinkedList<>(); + private volatile List> tasks; + private volatile List> futures; + private final AtomicInteger taskCompleted = new AtomicInteger(); + + ReferenceSet(Class clazz) { + this.clazz = clazz; + } + + boolean add(REF ref) { + return references.add(ref); + } + + boolean remove(REF ref) { + for(final Iterator i = references.iterator(); i.hasNext();) { + if (i.next() == ref) { + i.remove(); + return true; + } + } + return false; + } + + void submit(AtomicInteger errorCount, ExecutorService service) + throws InterruptedException { + final int size = references.size(); + tasks = createTasks(references, errorCount); + println("Submitting %d tasks for validating %s %s(s)", + tasks.size(), Util.toCommaSeparatedNumber(size), + clazz.getSimpleName()); + futures = service.invokeAll(tasks); + } + + void waitForFutures() throws Exception { + for(Future f : futures) { + f.get(); + taskCompleted.incrementAndGet(); + } + } + + double getTaskCompletedPercent() { + final List> t = tasks; + return t == null? 0 + : t.isEmpty()? 100 + : taskCompleted.get()*100.0/tasks.size(); + } + + @Override + public String toString() { + return String.format("%s %.1f%%", + clazz.getSimpleName(), getTaskCompletedPercent()); + } + } + + private final ReferenceSet withCounts + = new ReferenceSet<>(INodeReference.WithCount.class); + private final ReferenceSet withNames + = new ReferenceSet<>(INodeReference.WithName.class); + private final ReferenceSet dstReferences + = new ReferenceSet<>(INodeReference.DstReference.class); + + ReferenceSet getReferences( + Class clazz) { + if (clazz == INodeReference.WithCount.class) { + return (ReferenceSet) withCounts; + } else if (clazz == INodeReference.WithName.class) { + return (ReferenceSet) withNames; + } else if (clazz == INodeReference.DstReference.class) { + return (ReferenceSet) dstReferences; + } + throw new IllegalArgumentException("References not found for " + clazz); + } + + private int assertReferences() { + final int p = Runtime.getRuntime().availableProcessors(); + LOG.info("Available Processors: {}", p); + final ExecutorService service = Executors.newFixedThreadPool(p); + + final TimerTask checkProgress = new TimerTask() { + @Override + public void run() { + LOG.info("ASSERT_REFERENCES Progress: {}, {}, {}", + dstReferences, withCounts, withNames); + } + }; + final Timer t = new Timer(); + t.scheduleAtFixedRate(checkProgress, 0, 1_000); + + final AtomicInteger errorCount = new AtomicInteger(); + try { + dstReferences.submit(errorCount, service); + withCounts.submit(errorCount, service); + withNames.submit(errorCount, service); + + dstReferences.waitForFutures(); + withCounts.waitForFutures(); + withNames.waitForFutures(); + } catch (Throwable e) { + printError("Failed to assertReferences", e); + } finally { + service.shutdown(); + t.cancel(); + } + return errorCount.get(); + } + + static List> createTasks( + List references, AtomicInteger errorCount) { + final List> tasks = new LinkedList<>(); + for (final Iterator i = references.iterator(); i.hasNext();) { + tasks.add(new Task<>(i, errorCount)); + } + return tasks; + } + + static class Task implements Callable { + static final int BATCH_SIZE = 100_000; + + private final List references = new LinkedList<>(); + private final AtomicInteger errorCount; + + Task(Iterator i, AtomicInteger errorCount) { + for(int n = 0; i.hasNext() && n < BATCH_SIZE; n++) { + references.add(i.next()); + i.remove(); + } + this.errorCount = errorCount; + } + + @Override + public Integer call() throws Exception { + for (final REF ref : references) { + try { + ref.assertReferences(); + } catch (Throwable t) { + println("%d: %s", errorCount.incrementAndGet(), t); + } + } + return references.size(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsImageValidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsImageValidation.java new file mode 100644 index 00000000000..97bb2b9a33f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsImageValidation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestFsImageValidation { + static final Logger LOG = LoggerFactory.getLogger( + TestFsImageValidation.class); + + static { + final Level t = Level.TRACE; + FsImageValidation.Util.setLogLevel(FsImageValidation.class, t); + FsImageValidation.Util.setLogLevel(INodeReferenceValidation.class, t); + FsImageValidation.Util.setLogLevel(INode.class, t); + } + + /** + * Run validation as a unit test. + * The path of the fsimage file being tested is specified + * by the environment variable FS_IMAGE_FILE. + */ + @Test + public void testINodeReference() throws Exception { + FsImageValidation.initLogLevels(); + + try { + final Configuration conf = new Configuration(); + final FsImageValidation validation = FsImageValidation.newInstance(); + final int errorCount = validation.checkINodeReference(conf); + Assert.assertEquals("Error Count: " + errorCount, 0, errorCount); + } catch (HadoopIllegalArgumentException e) { + LOG.warn("The environment variable {} is not set: {}", + FsImageValidation.FS_IMAGE, e); + } + } + + @Test + public void testHaConf() { + final Configuration conf = new Configuration(); + final String nsId = "cluster0"; + FsImageValidation.setHaConf(nsId, conf); + Assert.assertTrue(HAUtil.isHAEnabled(conf, nsId)); + } + + @Test + public void testToCommaSeparatedNumber() { + for(long b = 1; b < Integer.MAX_VALUE;) { + for (long n = b; n < Integer.MAX_VALUE; n *= 10) { + runTestToCommaSeparatedNumber(n); + } + b = b == 1? 11: 10*(b-1) + 1; + } + } + + static void runTestToCommaSeparatedNumber(long n) { + final String s = FsImageValidation.Util.toCommaSeparatedNumber(n); + LOG.info("{} ?= {}", n, s); + for(int i = s.length(); i > 0;) { + for(int j = 0; j < 3 && i > 0; j++) { + Assert.assertTrue(Character.isDigit(s.charAt(--i))); + } + if (i > 0) { + Assert.assertEquals(',', s.charAt(--i)); + } + } + + Assert.assertNotEquals(0, s.length()%4); + Assert.assertEquals(n, Long.parseLong(s.replaceAll(",", ""))); + } +} \ No newline at end of file