HDFS-15463. Add a tool to validate FsImage. (#2140)

This commit is contained in:
Tsz-Wo Nicholas Sze 2020-07-19 23:14:30 -07:00 committed by GitHub
parent 4101b0c0ed
commit 2cec50cf16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 740 additions and 6 deletions

View File

@ -47,6 +47,7 @@ function hadoop_usage
hadoop_add_subcommand "ec" admin "run a HDFS ErasureCoding CLI"
hadoop_add_subcommand "fetchdt" client "fetch a delegation token from the NameNode"
hadoop_add_subcommand "fsck" admin "run a DFS filesystem checking utility"
hadoop_add_subcommand "fsImageValidation" admin "run FsImageValidation to check an fsimage"
hadoop_add_subcommand "getconf" client "get config values from configuration"
hadoop_add_subcommand "groups" client "get the groups which users belong to"
hadoop_add_subcommand "haadmin" admin "run a DFS HA admin client"
@ -143,6 +144,9 @@ function hdfscmd_case
fsck)
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.DFSck
;;
fsImageValidation)
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.server.namenode.FsImageValidation
;;
getconf)
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.GetConf
;;

View File

@ -59,7 +59,7 @@ if "%1" == "--loglevel" (
)
)
set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto dfsrouter dfsrouteradmin debug
set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck fsImageValidation balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir cacheadmin mover storagepolicies classpath crypto dfsrouter dfsrouteradmin debug
for %%i in ( %hdfscommands% ) do (
if %hdfs-command% == %%i set hdfscommand=true
)
@ -121,6 +121,11 @@ goto :eof
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_CLIENT_OPTS%
goto :eof
:fsImageValidation
set CLASS=org.apache.hadoop.hdfs.server.namenode.FsImageValidation
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_CLIENT_OPTS%
goto :eof
:balancer
set CLASS=org.apache.hadoop.hdfs.server.balancer.Balancer
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_BALANCER_OPTS%
@ -236,6 +241,7 @@ goto :eof
@echo dfsadmin run a DFS admin client
@echo haadmin run a DFS HA admin client
@echo fsck run a DFS filesystem checking utility
@echo fsImageValidation run FsImageValidation to check an fsimage
@echo balancer run a cluster balancing utility
@echo jmxget get JMX exported values from NameNode or DataNode.
@echo oiv apply the offline fsimage viewer to an fsimage

View File

@ -209,4 +209,9 @@ public abstract class EditLogInputStream implements Closeable {
* even faster data source (e.g. a byte buffer).
*/
public abstract boolean isLocalLog();
@Override
public String toString() {
return getName();
}
}

View File

@ -1179,7 +1179,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return Collections.unmodifiableList(auditLoggers);
}
private void loadFSImage(StartupOption startOpt) throws IOException {
void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage();
// format before starting up if requested

View File

@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.util.Time.now;
/**
* For validating {@link FSImage}.
* This tool will load the user specified {@link FSImage},
* build the namespace tree,
* and then run validations over the namespace tree.
*
* The main difference of this tool and
* {@link org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer}
* is that
* {@link org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer}
* only loads {@link FSImage} but it does not build the namespace tree.
* Therefore, running validations over the namespace tree is impossible in
* {@link org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer}.
*/
public class FsImageValidation {
static final Logger LOG = LoggerFactory.getLogger(FsImageValidation.class);
static final String FS_IMAGE = "FS_IMAGE";
static FsImageValidation newInstance(String... args) {
final String f = Cli.parse(args);
if (f == null) {
throw new HadoopIllegalArgumentException(
FS_IMAGE + " is not specified.");
}
return new FsImageValidation(new File(f));
}
static void initConf(Configuration conf) {
final int aDay = 24*3600_000;
conf.setInt(DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, aDay);
conf.setInt(DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, aDay);
conf.setBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false);
}
/** Set (fake) HA so that edit logs will not be loaded. */
static void setHaConf(String nsId, Configuration conf) {
conf.set(DFSConfigKeys.DFS_NAMESERVICES, nsId);
final String haNNKey = DFS_HA_NAMENODES_KEY_PREFIX + "." + nsId;
conf.set(haNNKey, "nn0,nn1");
final String rpcKey = DFS_NAMENODE_RPC_ADDRESS_KEY + "." + nsId + ".";
conf.set(rpcKey + "nn0", "127.0.0.1:8080");
conf.set(rpcKey + "nn1", "127.0.0.1:8080");
}
static void initLogLevels() {
Util.setLogLevel(FSImage.class, Level.TRACE);
Util.setLogLevel(FileJournalManager.class, Level.TRACE);
Util.setLogLevel(GSet.class, Level.OFF);
Util.setLogLevel(BlockManager.class, Level.OFF);
Util.setLogLevel(DatanodeManager.class, Level.OFF);
Util.setLogLevel(TopMetrics.class, Level.OFF);
}
static class Util {
static String memoryInfo() {
final Runtime runtime = Runtime.getRuntime();
return "Memory Info: free=" + StringUtils.byteDesc(runtime.freeMemory())
+ ", total=" + StringUtils.byteDesc(runtime.totalMemory())
+ ", max=" + StringUtils.byteDesc(runtime.maxMemory());
}
static void setLogLevel(Class<?> clazz, Level level) {
final Log log = LogFactory.getLog(clazz);
if (log instanceof Log4JLogger) {
final org.apache.log4j.Logger logger = ((Log4JLogger) log).getLogger();
logger.setLevel(level);
LOG.info("setLogLevel {} to {}, getEffectiveLevel() = {}",
clazz.getName(), level, logger.getEffectiveLevel());
} else {
LOG.warn("Failed setLogLevel {} to {}", clazz.getName(), level);
}
}
static String toCommaSeparatedNumber(long n) {
final StringBuilder b = new StringBuilder();
for(; n > 999;) {
b.insert(0, String.format(",%03d", n%1000));
n /= 1000;
}
return b.insert(0, n).toString();
}
}
private final File fsImageFile;
FsImageValidation(File fsImageFile) {
this.fsImageFile = fsImageFile;
}
int checkINodeReference(Configuration conf) throws Exception {
LOG.info(Util.memoryInfo());
initConf(conf);
final TimerTask checkProgress = new TimerTask() {
@Override
public void run() {
final double percent = NameNode.getStartupProgress().createView()
.getPercentComplete(Phase.LOADING_FSIMAGE);
LOG.info(String.format("%s Progress: %.1f%%",
Phase.LOADING_FSIMAGE, 100*percent));
}
};
INodeReferenceValidation.start();
final Timer t = new Timer();
t.scheduleAtFixedRate(checkProgress, 0, 60_000);
final long loadStart = now();
final FSNamesystem namesystem;
if (fsImageFile.isDirectory()) {
Cli.println("Loading %s as a directory.", fsImageFile);
final String dir = fsImageFile.getCanonicalPath();
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, dir);
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, dir);
final FSImage fsImage = new FSImage(conf);
namesystem = new FSNamesystem(conf, fsImage, true);
// Avoid saving fsimage
namesystem.setRollingUpgradeInfo(false, 0);
namesystem.loadFSImage(HdfsServerConstants.StartupOption.REGULAR);
} else {
Cli.println("Loading %s as a file.", fsImageFile);
final FSImage fsImage = new FSImage(conf);
namesystem = new FSNamesystem(conf, fsImage, true);
final NamespaceInfo namespaceInfo = NNStorage.newNamespaceInfo();
namespaceInfo.clusterID = "cluster0";
fsImage.getStorage().setStorageInfo(namespaceInfo);
final FSImageFormat.LoaderDelegator loader
= FSImageFormat.newLoader(conf, namesystem);
namesystem.writeLock();
namesystem.getFSDirectory().writeLock();
try {
loader.load(fsImageFile, false);
} finally {
namesystem.getFSDirectory().writeUnlock();
namesystem.writeUnlock();
}
}
t.cancel();
Cli.println("Loaded %s %s successfully in %s",
FS_IMAGE, fsImageFile, StringUtils.formatTime(now() - loadStart));
LOG.info(Util.memoryInfo());
final int errorCount = INodeReferenceValidation.end();
LOG.info(Util.memoryInfo());
return errorCount;
}
static class Cli extends Configured implements Tool {
static final String COMMAND;
static final String USAGE;
static {
final String clazz = FsImageValidation.class.getSimpleName();
COMMAND = Character.toLowerCase(clazz.charAt(0)) + clazz.substring(1);
USAGE = "Usage: hdfs " + COMMAND + " <" + FS_IMAGE + ">";
}
@Override
public int run(String[] args) throws Exception {
initLogLevels();
final FsImageValidation validation = FsImageValidation.newInstance(args);
final int errorCount = validation.checkINodeReference(getConf());
println("Error Count: %s", errorCount);
return errorCount == 0? 0: 1;
}
static String parse(String... args) {
final String f;
if (args == null || args.length == 0) {
f = System.getenv().get(FS_IMAGE);
if (f != null) {
println("Environment variable %s = %s", FS_IMAGE, f);
}
} else if (args.length == 1) {
f = args[0];
} else {
throw new HadoopIllegalArgumentException(
"args = " + Arrays.toString(args));
}
println("%s = %s", FS_IMAGE, f);
return f;
}
static void println(String format, Object... args) {
final String s = String.format(format, args);
System.out.println(s);
LOG.info(s);
}
static void printError(String message, Throwable t) {
System.out.println(message);
if (t != null) {
t.printStackTrace(System.out);
}
LOG.error(message, t);
}
}
public static void main(String[] args) {
if (DFSUtil.parseHelpArgument(args, Cli.USAGE, System.out, true)) {
System.exit(0);
}
try {
System.exit(ToolRunner.run(new Configuration(), new Cli(), args));
} catch (HadoopIllegalArgumentException e) {
e.printStackTrace(System.err);
System.err.println(Cli.USAGE);
System.exit(-1);
ToolRunner.printGenericCommandUsage(System.err);
} catch (Throwable e) {
Cli.printError("Failed to run " + Cli.COMMAND, e);
System.exit(-2);
}
}
}

View File

@ -660,9 +660,15 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
}
}
@VisibleForTesting
public String getFullPathAndObjectString() {
return getFullPathName() + "(" + getId() + ", " + getObjectString() + ")";
}
@VisibleForTesting
public String toDetailString() {
return toString() + "(" + getObjectString() + "), " + getParentString();
return toString() + "(" + getId() + ", " + getObjectString()
+ ", " + getParentString() + ")";
}
/** @return the parent directory */

View File

@ -21,6 +21,7 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.permission.FsPermission;
@ -61,6 +62,16 @@ import org.apache.hadoop.security.AccessControlException;
* inode(id=1000,name=bar).getParent() returns /xyz but not /abc.
*/
public abstract class INodeReference extends INode {
/** Assert the relationship this node and the references. */
abstract void assertReferences();
@Override
public String toDetailString() {
final String s = referred == null? null
: referred.getFullPathAndObjectString();
return super.toDetailString() + ", ->" + s;
}
/**
* Try to remove the given reference and then return the reference count.
* If the given inode is not a reference, return -1;
@ -346,7 +357,7 @@ public abstract class INodeReference extends INode {
out.print(", dstSnapshotId=" + ((DstReference) this).dstSnapshotId);
}
if (this instanceof WithCount) {
out.print(", count=" + ((WithCount)this).getReferenceCount());
out.print(", " + ((WithCount)this).getCountDetails());
}
out.println();
@ -382,7 +393,59 @@ public abstract class INodeReference extends INode {
public WithCount(INodeReference parent, INode referred) {
super(parent, referred);
Preconditions.checkArgument(!referred.isReference());
Preconditions.checkArgument(parent == null);
referred.setParentReference(this);
INodeReferenceValidation.add(this, WithCount.class);
}
private String getCountDetails() {
final StringBuilder b = new StringBuilder("[");
if (!withNameList.isEmpty()) {
final Iterator<WithName> i = withNameList.iterator();
b.append(i.next().getFullPathAndObjectString());
for(; i.hasNext();) {
b.append(", ").append(i.next().getFullPathAndObjectString());
}
}
b.append("]");
return ", count=" + getReferenceCount() + ", names=" + b;
}
@Override
public String toDetailString() {
return super.toDetailString() + getCountDetails();
}
private void assertDstReference(INodeReference parentRef) {
if (parentRef instanceof DstReference) {
return;
}
throw new IllegalArgumentException("Unexpected non-DstReference:"
+ "\n parentRef: " + parentRef.toDetailString()
+ "\n withCount: " + this.toDetailString());
}
private void assertReferredINode(INodeReference ref, String name) {
if (ref.getReferredINode() == this) {
return;
}
throw new IllegalStateException("Inconsistent Reference:"
+ "\n " + name + ": " + ref.toDetailString()
+ "\n withCount: " + this.toDetailString());
}
@Override
void assertReferences() {
for(WithName withName : withNameList) {
assertReferredINode(withName, " withName");
}
final INodeReference parentRef = getParentReference();
if (parentRef != null) {
assertDstReference(parentRef);
assertReferredINode(parentRef, "parentRef");
}
}
public int getReferenceCount() {
@ -406,16 +469,26 @@ public abstract class INodeReference extends INode {
}
}
private int search(WithName ref) {
return Collections.binarySearch(withNameList, ref, WITHNAME_COMPARATOR);
}
/** Decrement and then return the reference count. */
public void removeReference(INodeReference ref) {
if (ref instanceof WithName) {
int i = Collections.binarySearch(withNameList, (WithName) ref,
WITHNAME_COMPARATOR);
final WithName withName = (WithName) ref;
final int i = search(withName);
if (i >= 0) {
withNameList.remove(i);
INodeReferenceValidation.remove(withName, WithName.class);
}
} else if (ref == getParentReference()) {
setParent(null);
INodeReferenceValidation.remove((DstReference) ref, DstReference.class);
}
if (getReferenceCount() == 0) {
INodeReferenceValidation.remove(this, WithCount.class);
}
}
@ -481,6 +554,33 @@ public abstract class INodeReference extends INode {
this.name = name;
this.lastSnapshotId = lastSnapshotId;
referred.addReference(this);
INodeReferenceValidation.add(this, WithName.class);
}
@Override
void assertReferences() {
final INode ref= getReferredINode();
final String err;
if (ref instanceof WithCount) {
final WithCount withCount = (WithCount)ref;
final int i = withCount.search(this);
if (i >= 0) {
if (withCount.withNameList.get(i) == this) {
return;
} else {
err = "OBJECT MISMATCH, withNameList.get(" + i + ") != this";
}
} else {
err = "NOT FOUND in withNameList";
}
} else {
err = "UNEXPECTED CLASS, expecting WithCount";
}
throw new IllegalStateException(err + ":"
+ "\n ref: " + (ref == null? null : ref.toDetailString())
+ "\n this: " + this.toDetailString());
}
@Override
@ -642,6 +742,27 @@ public abstract class INodeReference extends INode {
super(parent, referred);
this.dstSnapshotId = dstSnapshotId;
referred.addReference(this);
INodeReferenceValidation.add(this, DstReference.class);
}
@Override
void assertReferences() {
final INode ref = getReferredINode();
final String err;
if (ref instanceof WithCount) {
if (ref.getParentReference() == this) {
return;
} else {
err = "OBJECT MISMATCH, ref.getParentReference() != this";
}
} else {
err = "UNEXPECTED CLASS, expecting WithCount";
}
throw new IllegalStateException(err + ":"
+ "\n ref: " + (ref == null? null : ref.toDetailString())
+ "\n this: " + this.toDetailString());
}
@Override

View File

@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Cli.*;
/** For validating {@link INodeReference} subclasses. */
public class INodeReferenceValidation {
public static final Logger LOG = LoggerFactory.getLogger(
INodeReferenceValidation.class);
private static final AtomicReference<INodeReferenceValidation> INSTANCE
= new AtomicReference<>();
public static void start() {
INSTANCE.compareAndSet(null, new INodeReferenceValidation());
println("Validation started");
}
public static int end() {
final INodeReferenceValidation instance = INSTANCE.getAndSet(null);
if (instance == null) {
return 0;
}
final int errorCount = instance.assertReferences();
println("Validation ended successfully: %d error(s) found.", errorCount);
return errorCount;
}
static <REF extends INodeReference> void add(REF ref, Class<REF> clazz) {
final INodeReferenceValidation validation = INSTANCE.get();
if (validation != null) {
final boolean added = validation.getReferences(clazz).add(ref);
Preconditions.checkState(added);
LOG.trace("add {}: {}", clazz, ref.toDetailString());
}
}
static <REF extends INodeReference> void remove(REF ref, Class<REF> clazz) {
final INodeReferenceValidation validation = INSTANCE.get();
if (validation != null) {
final boolean removed = validation.getReferences(clazz).remove(ref);
Preconditions.checkState(removed);
LOG.trace("remove {}: {}", clazz, ref.toDetailString());
}
}
static class ReferenceSet<REF extends INodeReference> {
private final Class<REF> clazz;
private final List<REF> references = new LinkedList<>();
private volatile List<Task<REF>> tasks;
private volatile List<Future<Integer>> futures;
private final AtomicInteger taskCompleted = new AtomicInteger();
ReferenceSet(Class<REF> clazz) {
this.clazz = clazz;
}
boolean add(REF ref) {
return references.add(ref);
}
boolean remove(REF ref) {
for(final Iterator<REF> i = references.iterator(); i.hasNext();) {
if (i.next() == ref) {
i.remove();
return true;
}
}
return false;
}
void submit(AtomicInteger errorCount, ExecutorService service)
throws InterruptedException {
final int size = references.size();
tasks = createTasks(references, errorCount);
println("Submitting %d tasks for validating %s %s(s)",
tasks.size(), Util.toCommaSeparatedNumber(size),
clazz.getSimpleName());
futures = service.invokeAll(tasks);
}
void waitForFutures() throws Exception {
for(Future<Integer> f : futures) {
f.get();
taskCompleted.incrementAndGet();
}
}
double getTaskCompletedPercent() {
final List<Task<REF>> t = tasks;
return t == null? 0
: t.isEmpty()? 100
: taskCompleted.get()*100.0/tasks.size();
}
@Override
public String toString() {
return String.format("%s %.1f%%",
clazz.getSimpleName(), getTaskCompletedPercent());
}
}
private final ReferenceSet<INodeReference.WithCount> withCounts
= new ReferenceSet<>(INodeReference.WithCount.class);
private final ReferenceSet<INodeReference.WithName> withNames
= new ReferenceSet<>(INodeReference.WithName.class);
private final ReferenceSet<INodeReference.DstReference> dstReferences
= new ReferenceSet<>(INodeReference.DstReference.class);
<REF extends INodeReference> ReferenceSet<REF> getReferences(
Class<REF> clazz) {
if (clazz == INodeReference.WithCount.class) {
return (ReferenceSet<REF>) withCounts;
} else if (clazz == INodeReference.WithName.class) {
return (ReferenceSet<REF>) withNames;
} else if (clazz == INodeReference.DstReference.class) {
return (ReferenceSet<REF>) dstReferences;
}
throw new IllegalArgumentException("References not found for " + clazz);
}
private int assertReferences() {
final int p = Runtime.getRuntime().availableProcessors();
LOG.info("Available Processors: {}", p);
final ExecutorService service = Executors.newFixedThreadPool(p);
final TimerTask checkProgress = new TimerTask() {
@Override
public void run() {
LOG.info("ASSERT_REFERENCES Progress: {}, {}, {}",
dstReferences, withCounts, withNames);
}
};
final Timer t = new Timer();
t.scheduleAtFixedRate(checkProgress, 0, 1_000);
final AtomicInteger errorCount = new AtomicInteger();
try {
dstReferences.submit(errorCount, service);
withCounts.submit(errorCount, service);
withNames.submit(errorCount, service);
dstReferences.waitForFutures();
withCounts.waitForFutures();
withNames.waitForFutures();
} catch (Throwable e) {
printError("Failed to assertReferences", e);
} finally {
service.shutdown();
t.cancel();
}
return errorCount.get();
}
static <REF extends INodeReference> List<Task<REF>> createTasks(
List<REF> references, AtomicInteger errorCount) {
final List<Task<REF>> tasks = new LinkedList<>();
for (final Iterator<REF> i = references.iterator(); i.hasNext();) {
tasks.add(new Task<>(i, errorCount));
}
return tasks;
}
static class Task<REF extends INodeReference> implements Callable<Integer> {
static final int BATCH_SIZE = 100_000;
private final List<REF> references = new LinkedList<>();
private final AtomicInteger errorCount;
Task(Iterator<REF> i, AtomicInteger errorCount) {
for(int n = 0; i.hasNext() && n < BATCH_SIZE; n++) {
references.add(i.next());
i.remove();
}
this.errorCount = errorCount;
}
@Override
public Integer call() throws Exception {
for (final REF ref : references) {
try {
ref.assertReferences();
} catch (Throwable t) {
println("%d: %s", errorCount.incrementAndGet(), t);
}
}
return references.size();
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestFsImageValidation {
static final Logger LOG = LoggerFactory.getLogger(
TestFsImageValidation.class);
static {
final Level t = Level.TRACE;
FsImageValidation.Util.setLogLevel(FsImageValidation.class, t);
FsImageValidation.Util.setLogLevel(INodeReferenceValidation.class, t);
FsImageValidation.Util.setLogLevel(INode.class, t);
}
/**
* Run validation as a unit test.
* The path of the fsimage file being tested is specified
* by the environment variable FS_IMAGE_FILE.
*/
@Test
public void testINodeReference() throws Exception {
FsImageValidation.initLogLevels();
try {
final Configuration conf = new Configuration();
final FsImageValidation validation = FsImageValidation.newInstance();
final int errorCount = validation.checkINodeReference(conf);
Assert.assertEquals("Error Count: " + errorCount, 0, errorCount);
} catch (HadoopIllegalArgumentException e) {
LOG.warn("The environment variable {} is not set: {}",
FsImageValidation.FS_IMAGE, e);
}
}
@Test
public void testHaConf() {
final Configuration conf = new Configuration();
final String nsId = "cluster0";
FsImageValidation.setHaConf(nsId, conf);
Assert.assertTrue(HAUtil.isHAEnabled(conf, nsId));
}
@Test
public void testToCommaSeparatedNumber() {
for(long b = 1; b < Integer.MAX_VALUE;) {
for (long n = b; n < Integer.MAX_VALUE; n *= 10) {
runTestToCommaSeparatedNumber(n);
}
b = b == 1? 11: 10*(b-1) + 1;
}
}
static void runTestToCommaSeparatedNumber(long n) {
final String s = FsImageValidation.Util.toCommaSeparatedNumber(n);
LOG.info("{} ?= {}", n, s);
for(int i = s.length(); i > 0;) {
for(int j = 0; j < 3 && i > 0; j++) {
Assert.assertTrue(Character.isDigit(s.charAt(--i)));
}
if (i > 0) {
Assert.assertEquals(',', s.charAt(--i));
}
}
Assert.assertNotEquals(0, s.length()%4);
Assert.assertEquals(n, Long.parseLong(s.replaceAll(",", "")));
}
}