HBASE-16052 Improve HBaseFsck Scalability (Ben Lau)

This commit is contained in:
tedyu 2016-06-27 13:24:35 -07:00
parent 424b789e03
commit 13cf4fb3fc
6 changed files with 483 additions and 156 deletions

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import edu.umd.cs.findbugs.annotations.CheckForNull;
/**
* Typical base class for file status filter. Works more efficiently when
* filtering file statuses, otherwise implementation will need to lookup filestatus
* for the path which will be expensive.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class AbstractFileStatusFilter implements PathFilter, FileStatusFilter {
/**
* Filters out a path. Can be given an optional directory hint to avoid
* filestatus lookup.
*
* @param p A filesystem path
* @param isDir An optional boolean indicating whether the path is a directory or not
* @return true if the path is accepted, false if the path is filtered out
*/
protected abstract boolean accept(Path p, @CheckForNull Boolean isDir);
@Override
public boolean accept(FileStatus f) {
return accept(f.getPath(), f.isDirectory());
}
@Override
public boolean accept(Path p) {
return accept(p, null);
}
protected boolean isFile(FileSystem fs, @CheckForNull Boolean isDir, Path p) throws IOException {
return !isDirectory(fs, isDir, p);
}
protected boolean isDirectory(FileSystem fs, @CheckForNull Boolean isDir, Path p) throws IOException {
return isDir != null ? isDir : fs.isDirectory(p);
}
}

View File

@ -31,14 +31,21 @@ import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -83,8 +90,12 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import edu.umd.cs.findbugs.annotations.CheckForNull;
/** /**
* Utility methods for interacting with the underlying file system. * Utility methods for interacting with the underlying file system.
*/ */
@ -1178,7 +1189,7 @@ public abstract class FSUtils {
/** /**
* A {@link PathFilter} that returns only regular files. * A {@link PathFilter} that returns only regular files.
*/ */
static class FileFilter implements PathFilter { static class FileFilter extends AbstractFileStatusFilter {
private final FileSystem fs; private final FileSystem fs;
public FileFilter(final FileSystem fs) { public FileFilter(final FileSystem fs) {
@ -1186,11 +1197,11 @@ public abstract class FSUtils {
} }
@Override @Override
public boolean accept(Path p) { protected boolean accept(Path p, @CheckForNull Boolean isDir) {
try { try {
return fs.isFile(p); return isFile(fs, isDir, p);
} catch (IOException e) { } catch (IOException e) {
LOG.debug("unable to verify if path=" + p + " is a regular file", e); LOG.warn("unable to verify if path=" + p + " is a regular file", e);
return false; return false;
} }
} }
@ -1199,7 +1210,7 @@ public abstract class FSUtils {
/** /**
* Directory filter that doesn't include any of the directories in the specified blacklist * Directory filter that doesn't include any of the directories in the specified blacklist
*/ */
public static class BlackListDirFilter implements PathFilter { public static class BlackListDirFilter extends AbstractFileStatusFilter {
private final FileSystem fs; private final FileSystem fs;
private List<String> blacklist; private List<String> blacklist;
@ -1218,19 +1229,18 @@ public abstract class FSUtils {
} }
@Override @Override
public boolean accept(Path p) { protected boolean accept(Path p, @CheckForNull Boolean isDir) {
boolean isValid = false; if (!isValidName(p.getName())) {
return false;
}
try { try {
if (isValidName(p.getName())) { return isDirectory(fs, isDir, p);
isValid = fs.getFileStatus(p).isDirectory();
} else {
isValid = false;
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("An error occurred while verifying if [" + p.toString() LOG.warn("An error occurred while verifying if [" + p.toString()
+ "] is a valid directory. Returning 'not valid' and continuing.", e); + "] is a valid directory. Returning 'not valid' and continuing.", e);
return false;
} }
return isValid;
} }
protected boolean isValidName(final String name) { protected boolean isValidName(final String name) {
@ -1368,7 +1378,7 @@ public abstract class FSUtils {
/** /**
* Filter for all dirs that don't start with '.' * Filter for all dirs that don't start with '.'
*/ */
public static class RegionDirFilter implements PathFilter { public static class RegionDirFilter extends AbstractFileStatusFilter {
// This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
final FileSystem fs; final FileSystem fs;
@ -1378,16 +1388,16 @@ public abstract class FSUtils {
} }
@Override @Override
public boolean accept(Path rd) { protected boolean accept(Path p, @CheckForNull Boolean isDir) {
if (!regionDirPattern.matcher(rd.getName()).matches()) { if (!regionDirPattern.matcher(p.getName()).matches()) {
return false; return false;
} }
try { try {
return fs.getFileStatus(rd).isDirectory(); return isDirectory(fs, isDir, p);
} catch (IOException ioe) { } catch (IOException ioe) {
// Maybe the file was moved or the fs was disconnected. // Maybe the file was moved or the fs was disconnected.
LOG.warn("Skipping file " + rd +" due to IOException", ioe); LOG.warn("Skipping file " + p +" due to IOException", ioe);
return false; return false;
} }
} }
@ -1403,8 +1413,11 @@ public abstract class FSUtils {
*/ */
public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException { public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
// assumes we are in a table dir. // assumes we are in a table dir.
FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs)); List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
List<Path> regionDirs = new ArrayList<Path>(rds.length); if (rds == null) {
return new ArrayList<Path>();
}
List<Path> regionDirs = new ArrayList<Path>(rds.size());
for (FileStatus rdfs: rds) { for (FileStatus rdfs: rds) {
Path rdPath = rdfs.getPath(); Path rdPath = rdfs.getPath();
regionDirs.add(rdPath); regionDirs.add(rdPath);
@ -1416,7 +1429,7 @@ public abstract class FSUtils {
* Filter for all dirs that are legal column family names. This is generally used for colfam * Filter for all dirs that are legal column family names. This is generally used for colfam
* dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;. * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.
*/ */
public static class FamilyDirFilter implements PathFilter { public static class FamilyDirFilter extends AbstractFileStatusFilter {
final FileSystem fs; final FileSystem fs;
public FamilyDirFilter(FileSystem fs) { public FamilyDirFilter(FileSystem fs) {
@ -1424,20 +1437,20 @@ public abstract class FSUtils {
} }
@Override @Override
public boolean accept(Path rd) { protected boolean accept(Path p, @CheckForNull Boolean isDir) {
try { try {
// throws IAE if invalid // throws IAE if invalid
HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName())); HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
// path name is an invalid family name and thus is excluded. // path name is an invalid family name and thus is excluded.
return false; return false;
} }
try { try {
return fs.getFileStatus(rd).isDirectory(); return isDirectory(fs, isDir, p);
} catch (IOException ioe) { } catch (IOException ioe) {
// Maybe the file was moved or the fs was disconnected. // Maybe the file was moved or the fs was disconnected.
LOG.warn("Skipping file " + rd +" due to IOException", ioe); LOG.warn("Skipping file " + p +" due to IOException", ioe);
return false; return false;
} }
} }
@ -1463,8 +1476,11 @@ public abstract class FSUtils {
} }
public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException { public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException {
FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs)); List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs));
List<Path> referenceFiles = new ArrayList<Path>(fds.length); if (fds == null) {
return new ArrayList<Path>();
}
List<Path> referenceFiles = new ArrayList<Path>(fds.size());
for (FileStatus fdfs: fds) { for (FileStatus fdfs: fds) {
Path fdPath = fdfs.getPath(); Path fdPath = fdfs.getPath();
referenceFiles.add(fdPath); referenceFiles.add(fdPath);
@ -1475,7 +1491,7 @@ public abstract class FSUtils {
/** /**
* Filter for HFiles that excludes reference files. * Filter for HFiles that excludes reference files.
*/ */
public static class HFileFilter implements PathFilter { public static class HFileFilter extends AbstractFileStatusFilter {
final FileSystem fs; final FileSystem fs;
public HFileFilter(FileSystem fs) { public HFileFilter(FileSystem fs) {
@ -1483,19 +1499,22 @@ public abstract class FSUtils {
} }
@Override @Override
public boolean accept(Path rd) { protected boolean accept(Path p, @CheckForNull Boolean isDir) {
if (!StoreFileInfo.isHFile(p)) {
return false;
}
try { try {
// only files return isFile(fs, isDir, p);
return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isHFile(rd);
} catch (IOException ioe) { } catch (IOException ioe) {
// Maybe the file was moved or the fs was disconnected. // Maybe the file was moved or the fs was disconnected.
LOG.warn("Skipping file " + rd +" due to IOException", ioe); LOG.warn("Skipping file " + p +" due to IOException", ioe);
return false; return false;
} }
} }
} }
public static class ReferenceFileFilter implements PathFilter { public static class ReferenceFileFilter extends AbstractFileStatusFilter {
private final FileSystem fs; private final FileSystem fs;
@ -1504,13 +1523,17 @@ public abstract class FSUtils {
} }
@Override @Override
public boolean accept(Path rd) { protected boolean accept(Path p, @CheckForNull Boolean isDir) {
if (!StoreFileInfo.isReference(p)) {
return false;
}
try { try {
// only files can be references. // only files can be references.
return !fs.getFileStatus(rd).isDirectory() && StoreFileInfo.isReference(rd); return isFile(fs, isDir, p);
} catch (IOException ioe) { } catch (IOException ioe) {
// Maybe the file was moved or the fs was disconnected. // Maybe the file was moved or the fs was disconnected.
LOG.warn("Skipping file " + rd +" due to IOException", ioe); LOG.warn("Skipping file " + p +" due to IOException", ioe);
return false; return false;
} }
} }
@ -1542,70 +1565,149 @@ public abstract class FSUtils {
* @param tableName name of the table to scan. * @param tableName name of the table to scan.
* @return Map keyed by StoreFile name with a value of the full Path. * @return Map keyed by StoreFile name with a value of the full Path.
* @throws IOException When scanning the directory fails. * @throws IOException When scanning the directory fails.
* @throws InterruptedException
*/ */
public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map,
final FileSystem fs, final Path hbaseRootDir, TableName tableName) final FileSystem fs, final Path hbaseRootDir, TableName tableName)
throws IOException { throws IOException, InterruptedException {
return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null); return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null);
} }
/** /**
* Runs through the HBase rootdir/tablename and creates a reverse lookup map for * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
* table StoreFile names to the full Path. * table StoreFile names to the full Path. Note that because this method can be called
* on a 'live' HBase system that we will skip files that no longer exist by the time
* we traverse them and similarly the user of the result needs to consider that some
* entries in this map may not exist by the time this call completes.
* <br> * <br>
* Example...<br> * Example...<br>
* Key = 3944417774205889744 <br> * Key = 3944417774205889744 <br>
* Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
* *
* @param map map to add values. If null, this method will create and populate one to return * @param resultMap map to add values. If null, this method will create and populate one to return
* @param fs The file system to use. * @param fs The file system to use.
* @param hbaseRootDir The root directory to scan. * @param hbaseRootDir The root directory to scan.
* @param tableName name of the table to scan. * @param tableName name of the table to scan.
* @param sfFilter optional path filter to apply to store files
* @param executor optional executor service to parallelize this operation
* @param errors ErrorReporter instance or null * @param errors ErrorReporter instance or null
* @return Map keyed by StoreFile name with a value of the full Path. * @return Map keyed by StoreFile name with a value of the full Path.
* @throws IOException When scanning the directory fails. * @throws IOException When scanning the directory fails.
* @throws InterruptedException
*/ */
public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, public static Map<String, Path> getTableStoreFilePathMap(
final FileSystem fs, final Path hbaseRootDir, TableName tableName, ErrorReporter errors) Map<String, Path> resultMap,
throws IOException { final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter,
if (map == null) { ExecutorService executor, final ErrorReporter errors) throws IOException, InterruptedException {
map = new HashMap<String, Path>();
} final Map<String, Path> finalResultMap =
resultMap == null ? new ConcurrentHashMap<String, Path>(128, 0.75f, 32) : resultMap;
// only include the directory paths to tables // only include the directory paths to tables
Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName); Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
// Inside a table, there are compaction.dir directories to skip. Otherwise, all else // Inside a table, there are compaction.dir directories to skip. Otherwise, all else
// should be regions. // should be regions.
PathFilter familyFilter = new FamilyDirFilter(fs); final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs)); final Vector<Exception> exceptions = new Vector<Exception>();
for (FileStatus regionDir : regionDirs) {
if (null != errors) { try {
errors.progress(); List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
if (regionDirs == null) {
return finalResultMap;
} }
Path dd = regionDir.getPath();
// else its a region name, now look in region for families final List<Future<?>> futures = new ArrayList<Future<?>>(regionDirs.size());
FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
for (FileStatus familyDir : familyDirs) { for (FileStatus regionDir : regionDirs) {
if (null != errors) { if (null != errors) {
errors.progress(); errors.progress();
} }
Path family = familyDir.getPath(); final Path dd = regionDir.getPath();
if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
continue; if (!exceptions.isEmpty()) {
break;
} }
// now in family, iterate over the StoreFiles and
// put in map Runnable getRegionStoreFileMapCall = new Runnable() {
FileStatus[] familyStatus = fs.listStatus(family); @Override
for (FileStatus sfStatus : familyStatus) { public void run() {
if (null != errors) { try {
errors.progress(); HashMap<String,Path> regionStoreFileMap = new HashMap<String, Path>();
List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
if (familyDirs == null) {
if (!fs.exists(dd)) {
LOG.warn("Skipping region because it no longer exists: " + dd);
} else {
LOG.warn("Skipping region because it has no family dirs: " + dd);
}
return;
}
for (FileStatus familyDir : familyDirs) {
if (null != errors) {
errors.progress();
}
Path family = familyDir.getPath();
if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
continue;
}
// now in family, iterate over the StoreFiles and
// put in map
FileStatus[] familyStatus = fs.listStatus(family);
for (FileStatus sfStatus : familyStatus) {
if (null != errors) {
errors.progress();
}
Path sf = sfStatus.getPath();
if (sfFilter == null || sfFilter.accept(sf)) {
regionStoreFileMap.put( sf.getName(), sf);
}
}
}
finalResultMap.putAll(regionStoreFileMap);
} catch (Exception e) {
LOG.error("Could not get region store file map for region: " + dd, e);
exceptions.add(e);
}
} }
Path sf = sfStatus.getPath(); };
map.put( sf.getName(), sf);
// If executor is available, submit async tasks to exec concurrently, otherwise
// just do serial sync execution
if (executor != null) {
Future<?> future = executor.submit(getRegionStoreFileMapCall);
futures.add(future);
} else {
FutureTask<?> future = new FutureTask<Object>(getRegionStoreFileMapCall, null);
future.run();
futures.add(future);
} }
} }
// Ensure all pending tasks are complete (or that we run into an exception)
for (Future<?> f : futures) {
if (!exceptions.isEmpty()) {
break;
}
try {
f.get();
} catch (ExecutionException e) {
LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e);
// Shouldn't happen, we already logged/caught any exceptions in the Runnable
}
}
} catch (IOException e) {
LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
exceptions.add(e);
} finally {
if (!exceptions.isEmpty()) {
// Just throw the first exception as an indication something bad happened
// Don't need to propagate all the exceptions, we already logged them all anyway
Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class);
throw Throwables.propagate(exceptions.firstElement());
}
} }
return map;
return finalResultMap;
} }
public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) {
@ -1632,11 +1734,12 @@ public abstract class FSUtils {
* @param hbaseRootDir The root directory to scan. * @param hbaseRootDir The root directory to scan.
* @return Map keyed by StoreFile name with a value of the full Path. * @return Map keyed by StoreFile name with a value of the full Path.
* @throws IOException When scanning the directory fails. * @throws IOException When scanning the directory fails.
* @throws InterruptedException
*/ */
public static Map<String, Path> getTableStoreFilePathMap( public static Map<String, Path> getTableStoreFilePathMap(
final FileSystem fs, final Path hbaseRootDir) final FileSystem fs, final Path hbaseRootDir)
throws IOException { throws IOException, InterruptedException {
return getTableStoreFilePathMap(fs, hbaseRootDir, null); return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
} }
/** /**
@ -1649,14 +1752,18 @@ public abstract class FSUtils {
* *
* @param fs The file system to use. * @param fs The file system to use.
* @param hbaseRootDir The root directory to scan. * @param hbaseRootDir The root directory to scan.
* @param sfFilter optional path filter to apply to store files
* @param executor optional executor service to parallelize this operation
* @param errors ErrorReporter instance or null * @param errors ErrorReporter instance or null
* @return Map keyed by StoreFile name with a value of the full Path. * @return Map keyed by StoreFile name with a value of the full Path.
* @throws IOException When scanning the directory fails. * @throws IOException When scanning the directory fails.
* @throws InterruptedException
*/ */
public static Map<String, Path> getTableStoreFilePathMap( public static Map<String, Path> getTableStoreFilePathMap(
final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors) final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
throws IOException { ExecutorService executor, ErrorReporter errors)
Map<String, Path> map = new HashMap<String, Path>(); throws IOException, InterruptedException {
ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<String, Path>(1024, 0.75f, 32);
// if this method looks similar to 'getTableFragmentation' that is because // if this method looks similar to 'getTableFragmentation' that is because
// it was borrowed from it. // it was borrowed from it.
@ -1664,11 +1771,44 @@ public abstract class FSUtils {
// only include the directory paths to tables // only include the directory paths to tables
for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
getTableStoreFilePathMap(map, fs, hbaseRootDir, getTableStoreFilePathMap(map, fs, hbaseRootDir,
FSUtils.getTableName(tableDir), errors); FSUtils.getTableName(tableDir), sfFilter, executor, errors);
} }
return map; return map;
} }
/**
* Filters FileStatuses in an array and returns a list
*
* @param input An array of FileStatuses
* @param filter A required filter to filter the array
* @return A list of FileStatuses
*/
public static List<FileStatus> filterFileStatuses(FileStatus[] input,
FileStatusFilter filter) {
if (input == null) return null;
return filterFileStatuses(Iterators.forArray(input), filter);
}
/**
* Filters FileStatuses in an iterator and returns a list
*
* @param input An iterator of FileStatuses
* @param filter A required filter to filter the array
* @return A list of FileStatuses
*/
public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
FileStatusFilter filter) {
if (input == null) return null;
ArrayList<FileStatus> results = new ArrayList<FileStatus>();
while (input.hasNext()) {
FileStatus f = input.next();
if (filter.accept(f)) {
results.add(f);
}
}
return results;
}
/** /**
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
* This accommodates differences between hadoop versions, where hadoop 1 * This accommodates differences between hadoop versions, where hadoop 1
@ -1677,6 +1817,48 @@ public abstract class FSUtils {
* *
* @param fs file system * @param fs file system
* @param dir directory * @param dir directory
* @param filter file status filter
* @return null if dir is empty or doesn't exist, otherwise FileStatus list
*/
public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs,
final Path dir, final FileStatusFilter filter) throws IOException {
FileStatus [] status = null;
try {
status = fs.listStatus(dir);
} catch (FileNotFoundException fnfe) {
// if directory doesn't exist, return null
if (LOG.isTraceEnabled()) {
LOG.trace(dir + " doesn't exist");
}
}
if (status == null || status.length < 1) {
return null;
}
if (filter == null) {
return Arrays.asList(status);
} else {
List<FileStatus> status2 = filterFileStatuses(status, filter);
if (status2 == null || status2.isEmpty()) {
return null;
} else {
return status2;
}
}
}
/**
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
* This accommodates differences between hadoop versions, where hadoop 1
* does not throw a FileNotFoundException, and return an empty FileStatus[]
* while Hadoop 2 will throw FileNotFoundException.
*
* Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem,
* Path, FileStatusFilter)} instead.
*
* @param fs file system
* @param dir directory
* @param filter path filter * @param filter path filter
* @return null if dir is empty or doesn't exist, otherwise FileStatus array * @return null if dir is empty or doesn't exist, otherwise FileStatus array
*/ */

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -54,7 +55,7 @@ public final class FSVisitor {
*/ */
public static void visitTableStoreFiles(final FileSystem fs, final Path tableDir, public static void visitTableStoreFiles(final FileSystem fs, final Path tableDir,
final StoreFileVisitor visitor) throws IOException { final StoreFileVisitor visitor) throws IOException {
FileStatus[] regions = FSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs)); List<FileStatus> regions = FSUtils.listStatusWithStatusFilter(fs, tableDir, new FSUtils.RegionDirFilter(fs));
if (regions == null) { if (regions == null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("No regions under directory:" + tableDir); LOG.trace("No regions under directory:" + tableDir);
@ -77,7 +78,7 @@ public final class FSVisitor {
*/ */
public static void visitRegionStoreFiles(final FileSystem fs, final Path regionDir, public static void visitRegionStoreFiles(final FileSystem fs, final Path regionDir,
final StoreFileVisitor visitor) throws IOException { final StoreFileVisitor visitor) throws IOException {
FileStatus[] families = FSUtils.listStatus(fs, regionDir, new FSUtils.FamilyDirFilter(fs)); List<FileStatus> families = FSUtils.listStatusWithStatusFilter(fs, regionDir, new FSUtils.FamilyDirFilter(fs));
if (families == null) { if (families == null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("No families under region directory:" + regionDir); LOG.trace("No families under region directory:" + regionDir);

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface FileStatusFilter {
/**
* Tests whether or not the specified filestatus should be
* included in a filestatus list.
*
* @param f The filestatus to be tested
* @return <code>true</code> if and only if the filestatus
* should be included
*/
boolean accept(FileStatus f);
}

View File

@ -52,6 +52,7 @@ import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -1015,26 +1016,16 @@ public class HBaseFsck extends Configured implements Closeable {
* Lingering reference file prevents a region from opening. It has to * Lingering reference file prevents a region from opening. It has to
* be fixed before a cluster can start properly. * be fixed before a cluster can start properly.
*/ */
private void offlineReferenceFileRepair() throws IOException { private void offlineReferenceFileRepair() throws IOException, InterruptedException {
Configuration conf = getConf(); Configuration conf = getConf();
Path hbaseRoot = FSUtils.getRootDir(conf); Path hbaseRoot = FSUtils.getRootDir(conf);
FileSystem fs = hbaseRoot.getFileSystem(conf); FileSystem fs = hbaseRoot.getFileSystem(conf);
LOG.info("Computing mapping of all store files"); LOG.info("Computing mapping of all store files");
Map<String, Path> allFiles = FSUtils.getTableStoreFilePathMap(fs, hbaseRoot, errors); Map<String, Path> allFiles = FSUtils.getTableStoreFilePathMap(fs, hbaseRoot,
new FSUtils.ReferenceFileFilter(fs), executor, errors);
errors.print(""); errors.print("");
LOG.info("Validating mapping using HDFS state"); LOG.info("Validating mapping using HDFS state");
for (Path path: allFiles.values()) { for (Path path: allFiles.values()) {
boolean isReference = false;
try {
isReference = StoreFileInfo.isReference(path);
} catch (Throwable t) {
// Ignore. Some files may not be store files at all.
// For example, files under .oldlogs folder in hbase:meta
// Warning message is already logged by
// StoreFile#isReference.
}
if (!isReference) continue;
Path referredToFile = StoreFileInfo.getReferredToFile(path); Path referredToFile = StoreFileInfo.getReferredToFile(path);
if (fs.exists(referredToFile)) continue; // good, expected if (fs.exists(referredToFile)) continue; // good, expected
@ -1711,23 +1702,22 @@ public class HBaseFsck extends Configured implements Closeable {
} }
} }
// level 1: <HBASE_DIR>/* // Avoid multithreading at table-level because already multithreaded internally at
List<WorkItemHdfsDir> dirs = new ArrayList<WorkItemHdfsDir>(tableDirs.size()); // region-level. Additionally multithreading at table-level can lead to deadlock
List<Future<Void>> dirsFutures; // if there are many tables in the cluster. Since there are a limited # of threads
// in the executor's thread pool and if we multithread at the table-level by putting
// WorkItemHdfsDir callables into the executor, then we will have some threads in the
// executor tied up solely in waiting for the tables' region-level calls to complete.
// If there are enough tables then there will be no actual threads in the pool left
// for the region-level callables to be serviced.
for (FileStatus tableDir : tableDirs) { for (FileStatus tableDir : tableDirs) {
LOG.debug("Loading region dirs from " +tableDir.getPath()); LOG.debug("Loading region dirs from " +tableDir.getPath());
dirs.add(new WorkItemHdfsDir(this, fs, errors, tableDir)); WorkItemHdfsDir item = new WorkItemHdfsDir(fs, errors, tableDir);
}
// Invoke and wait for Callables to complete
dirsFutures = executor.invokeAll(dirs);
for(Future<Void> f: dirsFutures) {
try { try {
f.get(); item.call();
} catch(ExecutionException e) { } catch (ExecutionException e) {
LOG.warn("Could not load region dir " , e.getCause()); LOG.warn("Could not completely load table dir " +
tableDir.getPath(), e.getCause());
} }
} }
errors.print(""); errors.print("");
@ -4044,70 +4034,117 @@ public class HBaseFsck extends Configured implements Closeable {
* Contact hdfs and get all information about specified table directory into * Contact hdfs and get all information about specified table directory into
* regioninfo list. * regioninfo list.
*/ */
static class WorkItemHdfsDir implements Callable<Void> { class WorkItemHdfsDir implements Callable<Void> {
private HBaseFsck hbck;
private FileStatus tableDir; private FileStatus tableDir;
private ErrorReporter errors; private ErrorReporter errors;
private FileSystem fs; private FileSystem fs;
WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors, WorkItemHdfsDir(FileSystem fs, ErrorReporter errors,
FileStatus status) { FileStatus status) {
this.hbck = hbck;
this.fs = fs; this.fs = fs;
this.tableDir = status; this.tableDir = status;
this.errors = errors; this.errors = errors;
} }
@Override @Override
public synchronized Void call() throws IOException { public synchronized Void call() throws InterruptedException, ExecutionException {
final Vector<Exception> exceptions = new Vector<Exception>();
try { try {
// level 2: <HBASE_DIR>/<table>/* final FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
FileStatus[] regionDirs = fs.listStatus(tableDir.getPath()); final List<Future<?>> futures = new ArrayList<Future<?>>(regionDirs.length);
for (FileStatus regionDir : regionDirs) {
for (final FileStatus regionDir : regionDirs) {
errors.progress(); errors.progress();
String encodedName = regionDir.getPath().getName(); final String encodedName = regionDir.getPath().getName();
// ignore directories that aren't hexadecimal // ignore directories that aren't hexadecimal
if (!encodedName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { if (!encodedName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
continue; continue;
} }
LOG.debug("Loading region info from hdfs:"+ regionDir.getPath()); if (!exceptions.isEmpty()) {
HbckInfo hbi = hbck.getOrCreateInfo(encodedName); break;
HdfsEntry he = new HdfsEntry(); }
synchronized (hbi) {
if (hbi.getHdfsRegionDir() != null) {
errors.print("Directory " + encodedName + " duplicate??" +
hbi.getHdfsRegionDir());
}
he.hdfsRegionDir = regionDir.getPath(); futures.add(executor.submit(new Runnable() {
he.hdfsRegionDirModTime = regionDir.getModificationTime(); @Override
Path regioninfoFile = new Path(he.hdfsRegionDir, HRegionFileSystem.REGION_INFO_FILE); public void run() {
he.hdfsRegioninfoFilePresent = fs.exists(regioninfoFile); try {
// we add to orphan list when we attempt to read .regioninfo LOG.debug("Loading region info from hdfs:"+ regionDir.getPath());
// Set a flag if this region contains only edits Path regioninfoFile = new Path(regionDir.getPath(), HRegionFileSystem.REGION_INFO_FILE);
// This is special case if a region is left after split boolean regioninfoFileExists = fs.exists(regioninfoFile);
he.hdfsOnlyEdits = true;
FileStatus[] subDirs = fs.listStatus(regionDir.getPath()); if (!regioninfoFileExists) {
Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath()); // As tables become larger it is more and more likely that by the time you
for (FileStatus subDir : subDirs) { // reach a given region that it will be gone due to region splits/merges.
errors.progress(); if (!fs.exists(regionDir.getPath())) {
String sdName = subDir.getPath().getName(); LOG.warn("By the time we tried to process this region dir it was already gone: "
if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) { + regionDir.getPath());
he.hdfsOnlyEdits = false; return;
break; }
}
HbckInfo hbi = HBaseFsck.this.getOrCreateInfo(encodedName);
HdfsEntry he = new HdfsEntry();
synchronized (hbi) {
if (hbi.getHdfsRegionDir() != null) {
errors.print("Directory " + encodedName + " duplicate??" +
hbi.getHdfsRegionDir());
}
he.hdfsRegionDir = regionDir.getPath();
he.hdfsRegionDirModTime = regionDir.getModificationTime();
he.hdfsRegioninfoFilePresent = regioninfoFileExists;
// we add to orphan list when we attempt to read .regioninfo
// Set a flag if this region contains only edits
// This is special case if a region is left after split
he.hdfsOnlyEdits = true;
FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath());
for (FileStatus subDir : subDirs) {
errors.progress();
String sdName = subDir.getPath().getName();
if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {
he.hdfsOnlyEdits = false;
break;
}
}
hbi.hdfsEntry = he;
}
} catch (Exception e) {
LOG.error("Could not load region dir", e);
exceptions.add(e);
} }
} }
hbi.hdfsEntry = he; }));
}
// Ensure all pending tasks are complete (or that we run into an exception)
for (Future<?> f : futures) {
if (!exceptions.isEmpty()) {
break;
} }
try {
f.get();
} catch (ExecutionException e) {
LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e);
// Shouldn't happen, we already logged/caught any exceptions in the Runnable
};
} }
} catch (IOException e) { } catch (IOException e) {
// unable to connect to the region server. LOG.error("Cannot execute WorkItemHdfsDir for " + tableDir, e);
errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: " exceptions.add(e);
+ tableDir.getPath().getName() } finally {
+ " Unable to fetch region information. " + e); if (!exceptions.isEmpty()) {
throw e; errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: "
+ tableDir.getPath().getName()
+ " Unable to fetch all HDFS region information. ");
// Just throw the first exception as an indication something bad happened
// Don't need to propagate all the exceptions, we already logged them all anyway
throw new ExecutionException("First exception in WorkItemHdfsDir", exceptions.firstElement());
}
} }
return null; return null;
} }

View File

@ -159,9 +159,9 @@ public class HFileCorruptionChecker {
* @throws IOException * @throws IOException
*/ */
protected void checkColFamDir(Path cfDir) throws IOException { protected void checkColFamDir(Path cfDir) throws IOException {
FileStatus[] hfs = null; FileStatus[] statuses = null;
try { try {
hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner. statuses = fs.listStatus(cfDir); // use same filter as scanner.
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Colfam Directory " + cfDir + LOG.warn("Colfam Directory " + cfDir +
@ -170,8 +170,9 @@ public class HFileCorruptionChecker {
return; return;
} }
List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist. // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (hfs.length == 0 && !fs.exists(cfDir)) { if (hfs.size() == 0 && !fs.exists(cfDir)) {
LOG.warn("Colfam Directory " + cfDir + LOG.warn("Colfam Directory " + cfDir +
" does not exist. Likely due to concurrent split/compaction. Skipping."); " does not exist. Likely due to concurrent split/compaction. Skipping.");
missing.add(cfDir); missing.add(cfDir);
@ -191,9 +192,9 @@ public class HFileCorruptionChecker {
* @throws IOException * @throws IOException
*/ */
protected void checkMobColFamDir(Path cfDir) throws IOException { protected void checkMobColFamDir(Path cfDir) throws IOException {
FileStatus[] hfs = null; FileStatus[] statuses = null;
try { try {
hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner. statuses = fs.listStatus(cfDir); // use same filter as scanner.
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Mob colfam Directory " + cfDir + LOG.warn("Mob colfam Directory " + cfDir +
@ -202,8 +203,9 @@ public class HFileCorruptionChecker {
return; return;
} }
List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist. // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (hfs.length == 0 && !fs.exists(cfDir)) { if (hfs.size() == 0 && !fs.exists(cfDir)) {
LOG.warn("Mob colfam Directory " + cfDir + LOG.warn("Mob colfam Directory " + cfDir +
" does not exist. Likely the table is deleted. Skipping."); " does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(cfDir); missedMobFiles.add(cfDir);
@ -294,9 +296,9 @@ public class HFileCorruptionChecker {
* @throws IOException * @throws IOException
*/ */
protected void checkRegionDir(Path regionDir) throws IOException { protected void checkRegionDir(Path regionDir) throws IOException {
FileStatus[] cfs = null; FileStatus[] statuses = null;
try { try {
cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs)); statuses = fs.listStatus(regionDir);
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Region Directory " + regionDir + LOG.warn("Region Directory " + regionDir +
@ -305,8 +307,9 @@ public class HFileCorruptionChecker {
return; return;
} }
List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs));
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist. // Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (cfs.length == 0 && !fs.exists(regionDir)) { if (cfs.size() == 0 && !fs.exists(regionDir)) {
LOG.warn("Region Directory " + regionDir + LOG.warn("Region Directory " + regionDir +
" does not exist. Likely due to concurrent split/compaction. Skipping."); " does not exist. Likely due to concurrent split/compaction. Skipping.");
missing.add(regionDir); missing.add(regionDir);
@ -327,12 +330,13 @@ public class HFileCorruptionChecker {
* @throws IOException * @throws IOException
*/ */
void checkTableDir(Path tableDir) throws IOException { void checkTableDir(Path tableDir) throws IOException {
FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs)); List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs));
if (rds.length == 0 && !fs.exists(tableDir)) { if (rds == null) {
// interestingly listStatus does not throw an exception if the path does not exist. if (!fs.exists(tableDir)) {
LOG.warn("Table Directory " + tableDir + LOG.warn("Table Directory " + tableDir +
" does not exist. Likely due to concurrent delete. Skipping."); " does not exist. Likely due to concurrent delete. Skipping.");
missing.add(tableDir); missing.add(tableDir);
}
return; return;
} }