HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync.

* pull things that don't rely on HDFS in hbase-server/FSUtils into hbase-common/CommonFSUtils
* refactor setStoragePolicy so that it can move into hbase-common/CommonFSUtils, as a side effect update it for Hadoop 2.8,3.0+
* refactor WALProcedureStore so that it handles its own FS interactions
* add a reflection-based lookup of stream capabilities
* call said lookup in places where we make WALs to make sure hflush/hsync is available.
* javadoc / checkstyle cleanup on changes as flagged by yetus

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Sean Busbey 2017-10-12 10:59:43 -05:00
parent 20cc11e167
commit a9f0c5d4e2
37 changed files with 1344 additions and 903 deletions

View File

@ -0,0 +1,890 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Utility methods for interacting with the underlying file system.
*/
@InterfaceAudience.Private
public abstract class CommonFSUtils {
private static final Log LOG = LogFactory.getLog(CommonFSUtils.class);
/** Parameter name for HBase WAL directory */
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
/** Full access permissions (starting point for a umask) */
public static final String FULL_RWX_PERMISSIONS = "777";
protected CommonFSUtils() {
super();
}
/**
* Compare of path component. Does not consider schema; i.e. if schemas
* different but <code>path</code> starts with <code>rootPath</code>,
* then the function returns true
* @param rootPath value to check for
* @param path subject to check
* @return True if <code>path</code> starts with <code>rootPath</code>
*/
public static boolean isStartingWithPath(final Path rootPath, final String path) {
String uriRootPath = rootPath.toUri().getPath();
String tailUriPath = (new Path(path)).toUri().getPath();
return tailUriPath.startsWith(uriRootPath);
}
/**
* Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
* '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
* the two will equate.
* @param pathToSearch Path we will be trying to match against.
* @param pathTail what to match
* @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
*/
public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
return isMatchingTail(pathToSearch, new Path(pathTail));
}
/**
* Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
* '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
* schema; i.e. if schemas different but path or subpath matches, the two will equate.
* @param pathToSearch Path we will be trying to match agains against
* @param pathTail what to match
* @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
*/
public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
if (pathToSearch.depth() != pathTail.depth()) {
return false;
}
Path tailPath = pathTail;
String tailName;
Path toSearch = pathToSearch;
String toSearchName;
boolean result = false;
do {
tailName = tailPath.getName();
if (tailName == null || tailName.length() <= 0) {
result = true;
break;
}
toSearchName = toSearch.getName();
if (toSearchName == null || toSearchName.length() <= 0) {
break;
}
// Move up a parent on each path for next go around. Path doesn't let us go off the end.
tailPath = tailPath.getParent();
toSearch = toSearch.getParent();
} while(tailName.equals(toSearchName));
return result;
}
/**
* Delete if exists.
* @param fs filesystem object
* @param dir directory to delete
* @return True if deleted <code>dir</code>
* @throws IOException e
*/
public static boolean deleteDirectory(final FileSystem fs, final Path dir)
throws IOException {
return fs.exists(dir) && fs.delete(dir, true);
}
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize i/o time.
*
* use reflection to search for getDefaultBlockSize(Path f)
* if the method doesn't exist, fall back to using getDefaultBlockSize()
*
* @param fs filesystem object
* @return the default block size for the path's filesystem
* @throws IOException e
*/
public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
Method m = null;
Class<? extends FileSystem> cls = fs.getClass();
try {
m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
} catch (NoSuchMethodException e) {
LOG.info("FileSystem doesn't support getDefaultBlockSize");
} catch (SecurityException e) {
LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
m = null; // could happen on setAccessible()
}
if (m == null) {
return fs.getDefaultBlockSize(path);
} else {
try {
Object ret = m.invoke(fs, path);
return ((Long)ret).longValue();
} catch (Exception e) {
throw new IOException(e);
}
}
}
/*
* Get the default replication.
*
* use reflection to search for getDefaultReplication(Path f)
* if the method doesn't exist, fall back to using getDefaultReplication()
*
* @param fs filesystem object
* @param f path of file
* @return default replication for the path's filesystem
* @throws IOException e
*/
public static short getDefaultReplication(final FileSystem fs, final Path path)
throws IOException {
Method m = null;
Class<? extends FileSystem> cls = fs.getClass();
try {
m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
} catch (NoSuchMethodException e) {
LOG.info("FileSystem doesn't support getDefaultReplication");
} catch (SecurityException e) {
LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
m = null; // could happen on setAccessible()
}
if (m == null) {
return fs.getDefaultReplication(path);
} else {
try {
Object ret = m.invoke(fs, path);
return ((Number)ret).shortValue();
} catch (Exception e) {
throw new IOException(e);
}
}
}
/**
* Returns the default buffer size to use during writes.
*
* The size of the buffer should probably be a multiple of hardware
* page size (4096 on Intel x86), and it determines how much data is
* buffered during read and write operations.
*
* @param fs filesystem object
* @return default buffer size to use during writes
*/
public static int getDefaultBufferSize(final FileSystem fs) {
return fs.getConf().getInt("io.file.buffer.size", 4096);
}
/**
* Create the specified file on the filesystem. By default, this will:
* <ol>
* <li>apply the umask in the configuration (if it is enabled)</li>
* <li>use the fs configured buffer size (or 4096 if not set)</li>
* <li>use the default replication</li>
* <li>use the default block size</li>
* <li>not track progress</li>
* </ol>
*
* @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write
* @param perm intial permissions
* @param overwrite Whether or not the created file should be overwritten.
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm, boolean overwrite) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
}
return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
}
/**
* Get the file permissions specified in the configuration, if they are
* enabled.
*
* @param fs filesystem that the file will be created on.
* @param conf configuration to read for determining if permissions are
* enabled and which to use
* @param permssionConfKey property key in the configuration to use when
* finding the permission
* @return the permission to use when creating a new file on the fs. If
* special permissions are not specified in the configuration, then
* the default permissions on the the fs will be returned.
*/
public static FsPermission getFilePermissions(final FileSystem fs,
final Configuration conf, final String permssionConfKey) {
boolean enablePermissions = conf.getBoolean(
HConstants.ENABLE_DATA_FILE_UMASK, false);
if (enablePermissions) {
try {
FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
// make sure that we have a mask, if not, go default.
String mask = conf.get(permssionConfKey);
if (mask == null) {
return FsPermission.getFileDefault();
}
// appy the umask
FsPermission umask = new FsPermission(mask);
return perm.applyUMask(umask);
} catch (IllegalArgumentException e) {
LOG.warn(
"Incorrect umask attempted to be created: "
+ conf.get(permssionConfKey)
+ ", using default file permissions.", e);
return FsPermission.getFileDefault();
}
}
return FsPermission.getFileDefault();
}
/**
* Verifies root directory path is a valid URI with a scheme
*
* @param root root directory path
* @return Passed <code>root</code> argument.
* @throws IOException if not a valid URI with a scheme
*/
public static Path validateRootPath(Path root) throws IOException {
try {
URI rootURI = new URI(root.toString());
String scheme = rootURI.getScheme();
if (scheme == null) {
throw new IOException("Root directory does not have a scheme");
}
return root;
} catch (URISyntaxException e) {
IOException io = new IOException("Root directory path is not a valid " +
"URI -- check your " + HConstants.HBASE_DIR + " configuration");
io.initCause(e);
throw io;
}
}
/**
* Checks for the presence of the WAL log root path (using the provided conf object) in the given
* path. If it exists, this method removes it and returns the String representation of remaining
* relative path.
* @param path must not be null
* @param conf must not be null
* @return String representation of the remaining relative path
* @throws IOException from underlying filesystem
*/
public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
Path root = getWALRootDir(conf);
String pathStr = path.toString();
// check that the path is absolute... it has the root path in it.
if (!pathStr.startsWith(root.toString())) {
return pathStr;
}
// if not, return as it is.
return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
}
/**
* Return the 'path' component of a Path. In Hadoop, Path is an URI. This
* method returns the 'path' component of a Path's URI: e.g. If a Path is
* <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
* this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
* This method is useful if you want to print out a Path without qualifying
* Filesystem instance.
* @param p Filesystem Path whose 'path' component we are to return.
* @return Path portion of the Filesystem
*/
public static String getPath(Path p) {
return p.toUri().getPath();
}
/**
* @param c configuration
* @return {@link Path} to hbase root directory from
* configuration as a qualified Path.
* @throws IOException e
*/
public static Path getRootDir(final Configuration c) throws IOException {
Path p = new Path(c.get(HConstants.HBASE_DIR));
FileSystem fs = p.getFileSystem(c);
return p.makeQualified(fs);
}
public static void setRootDir(final Configuration c, final Path root) throws IOException {
c.set(HConstants.HBASE_DIR, root.toString());
}
public static void setFsDefault(final Configuration c, final Path root) throws IOException {
c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
}
public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
Path p = getRootDir(c);
return p.getFileSystem(c);
}
/**
* @param c configuration
* @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
* configuration as a qualified Path. Defaults to HBase root dir.
* @throws IOException e
*/
public static Path getWALRootDir(final Configuration c) throws IOException {
Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
if (!isValidWALRootDir(p, c)) {
return getRootDir(c);
}
FileSystem fs = p.getFileSystem(c);
return p.makeQualified(fs);
}
@VisibleForTesting
public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
c.set(HBASE_WAL_DIR, root.toString());
}
public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
Path p = getWALRootDir(c);
return p.getFileSystem(c);
}
private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
Path rootDir = getRootDir(c);
if (walDir != rootDir) {
if (walDir.toString().startsWith(rootDir.toString() + "/")) {
throw new IllegalStateException("Illegal WAL directory specified. " +
"WAL directories are not permitted to be under the root directory if set.");
}
}
return true;
}
/**
* Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
* path rootdir
*
* @param rootdir qualified path of HBase root directory
* @param tableName name of table
* @return {@link org.apache.hadoop.fs.Path} for table
*/
public static Path getTableDir(Path rootdir, final TableName tableName) {
return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
}
/**
* Returns the {@link org.apache.hadoop.hbase.TableName} object representing
* the table directory under
* path rootdir
*
* @param tablePath path of table
* @return {@link org.apache.hadoop.fs.Path} for table
*/
public static TableName getTableName(Path tablePath) {
return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
}
/**
* Returns the {@link org.apache.hadoop.fs.Path} object representing
* the namespace directory under path rootdir
*
* @param rootdir qualified path of HBase root directory
* @param namespace namespace name
* @return {@link org.apache.hadoop.fs.Path} for table
*/
public static Path getNamespaceDir(Path rootdir, final String namespace) {
return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
new Path(namespace)));
}
/**
* Sets storage policy for given path according to config setting.
* If the passed path is a directory, we'll set the storage policy for all files
* created in the future in said directory. Note that this change in storage
* policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
* If we're running on a FileSystem implementation that doesn't support the given storage policy
* (or storage policies at all), then we'll issue a log message and continue.
*
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
*
* @param fs We only do anything it implements a setStoragePolicy method
* @param conf used to look up storage policy with given key; not modified.
* @param path the Path whose storage policy is to be set
* @param policyKey Key to use pulling a policy from Configuration:
* e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy).
* @param defaultPolicy if the configured policy is equal to this policy name, we will skip
* telling the FileSystem to set a storage policy.
*/
public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
final Path path, final String policyKey, final String defaultPolicy) {
String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
if (storagePolicy.equals(defaultPolicy)) {
if (LOG.isTraceEnabled()) {
LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
}
return;
}
setStoragePolicy(fs, path, storagePolicy);
}
// this mapping means that under a federated FileSystem implementation, we'll
// only log the first failure from any of the underlying FileSystems at WARN and all others
// will be at DEBUG.
private static final Map<FileSystem, Boolean> warningMap =
new ConcurrentHashMap<FileSystem, Boolean>();
/**
* Sets storage policy for given path.
* If the passed path is a directory, we'll set the storage policy for all files
* created in the future in said directory. Note that this change in storage
* policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
* If we're running on a version of FileSystem that doesn't support the given storage policy
* (or storage policies at all), then we'll issue a log message and continue.
*
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
*
* @param fs We only do anything it implements a setStoragePolicy method
* @param path the Path whose storage policy is to be set
* @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
* org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
* 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
*/
public static void setStoragePolicy(final FileSystem fs, final Path path,
final String storagePolicy) {
if (storagePolicy == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("We were passed a null storagePolicy, exiting early.");
}
return;
}
final String trimmedStoragePolicy = storagePolicy.trim();
if (trimmedStoragePolicy.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("We were passed an empty storagePolicy, exiting early.");
}
return;
}
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
}
/*
* All args have been checked and are good. Run the setStoragePolicy invocation.
*/
private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
final String storagePolicy) {
Method m = null;
try {
m = fs.getClass().getDeclaredMethod("setStoragePolicy",
new Class<?>[] { Path.class, String.class });
m.setAccessible(true);
} catch (NoSuchMethodException e) {
final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " +
"not available. This is normal and expected on earlier Hadoop versions.";
if (!warningMap.containsKey(fs)) {
warningMap.put(fs, true);
LOG.warn(msg, e);
} else if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
}
m = null;
} catch (SecurityException e) {
final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " +
"HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " +
"to the user@hbase mailing list. Please be sure to include a link to your configs, and " +
"logs that include this message and period of time before it. Logs around service " +
"start up will probably be useful as well.";
if (!warningMap.containsKey(fs)) {
warningMap.put(fs, true);
LOG.warn(msg, e);
} else if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
}
m = null; // could happen on setAccessible() or getDeclaredMethod()
}
if (m != null) {
try {
m.invoke(fs, path, storagePolicy);
if (LOG.isDebugEnabled()) {
LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
}
} catch (Exception e) {
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
// misuse than a runtime problem with HDFS.
if (!warningMap.containsKey(fs)) {
warningMap.put(fs, true);
LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
"DEBUG log level might have more details.", e);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
}
// check for lack of HDFS-7228
if (e instanceof InvocationTargetException) {
final Throwable exception = e.getCause();
if (exception instanceof RemoteException &&
HadoopIllegalArgumentException.class.getName().equals(
((RemoteException)exception).getClassName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
}
// Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
// that throws UnsupportedOperationException
} else if (exception instanceof UnsupportedOperationException) {
if (LOG.isDebugEnabled()) {
LOG.debug("The underlying FileSystem implementation doesn't support " +
"setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
"appears to be present in your version of Hadoop. For more information check " +
"the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
"specification docs from HADOOP-11981, and/or related documentation from the " +
"provider of the underlying FileSystem (its name should appear in the " +
"stacktrace that accompanies this message). Note in particular that Hadoop's " +
"local filesystem implementation doesn't support storage policies.", exception);
}
}
}
}
}
}
/**
* @param conf must not be null
* @return True if this filesystem whose scheme is 'hdfs'.
* @throws IOException from underlying FileSystem
*/
public static boolean isHDFS(final Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
String scheme = fs.getUri().getScheme();
return scheme.equalsIgnoreCase("hdfs");
}
/**
* Checks if the given path is the one with 'recovered.edits' dir.
* @param path must not be null
* @return True if we recovered edits
*/
public static boolean isRecoveredEdits(Path path) {
return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
}
/**
* @param conf must not be null
* @return Returns the filesystem of the hbase rootdir.
* @throws IOException from underlying FileSystem
*/
public static FileSystem getCurrentFileSystem(Configuration conf)
throws IOException {
return getRootDir(conf).getFileSystem(conf);
}
/**
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
* This accommodates differences between hadoop versions, where hadoop 1
* does not throw a FileNotFoundException, and return an empty FileStatus[]
* while Hadoop 2 will throw FileNotFoundException.
*
* Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
* Path, FileStatusFilter) instead.
*
* @param fs file system
* @param dir directory
* @param filter path filter
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
*/
public static FileStatus [] listStatus(final FileSystem fs,
final Path dir, final PathFilter filter) throws IOException {
FileStatus [] status = null;
try {
status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
} catch (FileNotFoundException fnfe) {
// if directory doesn't exist, return null
if (LOG.isTraceEnabled()) {
LOG.trace(dir + " doesn't exist");
}
}
if (status == null || status.length < 1) {
return null;
}
return status;
}
/**
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
* This would accommodates differences between hadoop versions
*
* @param fs file system
* @param dir directory
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
*/
public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
return listStatus(fs, dir, null);
}
/**
* Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
*
* @param fs file system
* @param dir directory
* @return LocatedFileStatus list
*/
public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
final Path dir) throws IOException {
List<LocatedFileStatus> status = null;
try {
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
.listFiles(dir, false);
while (locatedFileStatusRemoteIterator.hasNext()) {
if (status == null) {
status = Lists.newArrayList();
}
status.add(locatedFileStatusRemoteIterator.next());
}
} catch (FileNotFoundException fnfe) {
// if directory doesn't exist, return null
if (LOG.isTraceEnabled()) {
LOG.trace(dir + " doesn't exist");
}
}
return status;
}
/**
* Calls fs.delete() and returns the value returned by the fs.delete()
*
* @param fs must not be null
* @param path must not be null
* @param recursive delete tree rooted at path
* @return the value returned by the fs.delete()
* @throws IOException from underlying FileSystem
*/
public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
throws IOException {
return fs.delete(path, recursive);
}
/**
* Calls fs.exists(). Checks if the specified path exists
*
* @param fs must not be null
* @param path must not be null
* @return the value returned by fs.exists()
* @throws IOException from underlying FileSystem
*/
public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
return fs.exists(path);
}
/**
* Log the current state of the filesystem from a certain root directory
* @param fs filesystem to investigate
* @param root root file/directory to start logging from
* @param LOG log to output information
* @throws IOException if an unexpected exception occurs
*/
public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
throws IOException {
LOG.debug("Current file system:");
logFSTree(LOG, fs, root, "|-");
}
/**
* Recursive helper to log the state of the FS
*
* @see #logFileSystemState(FileSystem, Path, Log)
*/
private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
throws IOException {
FileStatus[] files = listStatus(fs, root, null);
if (files == null) {
return;
}
for (FileStatus file : files) {
if (file.isDirectory()) {
LOG.debug(prefix + file.getPath().getName() + "/");
logFSTree(LOG, fs, file.getPath(), prefix + "---");
} else {
LOG.debug(prefix + file.getPath().getName());
}
}
}
public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
throws IOException {
// set the modify time for TimeToLive Cleaner
fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
return fs.rename(src, dest);
}
/**
* Do our short circuit read setup.
* Checks buffer size to use and whether to do checksumming in hbase or hdfs.
* @param conf must not be null
*/
public static void setupShortCircuitRead(final Configuration conf) {
// Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
boolean shortCircuitSkipChecksum =
conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
if (shortCircuitSkipChecksum) {
LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
"be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
"it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
assert !shortCircuitSkipChecksum; //this will fail if assertions are on
}
checkShortCircuitReadBufferSize(conf);
}
/**
* Check if short circuit read buffer size is set and if not, set it to hbase value.
* @param conf must not be null
*/
public static void checkShortCircuitReadBufferSize(final Configuration conf) {
final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
final int notSet = -1;
// DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
int size = conf.getInt(dfsKey, notSet);
// If a size is set, return -- we will use it.
if (size != notSet) {
return;
}
// But short circuit buffer size is normally not set. Put in place the hbase wanted size.
int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
}
// Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
// not until we attempt to reference it.
private static class StreamCapabilities {
public static final boolean PRESENT;
public static final Class<?> CLASS;
public static final Method METHOD;
static {
boolean tmp = false;
Class<?> clazz = null;
Method method = null;
try {
clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
method = clazz.getMethod("hasCapability", String.class);
tmp = true;
} catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
"HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
"support hflush/hsync. If you are running on top of HDFS this probably just " +
"means you have an older version and this can be ignored. If you are running on " +
"top of an alternate FileSystem implementation you should manually verify that " +
"hflush and hsync are implemented; otherwise you risk data loss and hard to " +
"diagnose errors when our assumptions are violated.");
LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
exception);
} finally {
PRESENT = tmp;
CLASS = clazz;
METHOD = method;
}
}
}
/**
* If our FileSystem version includes the StreamCapabilities class, check if
* the given stream has a particular capability.
* @param stream capabilities are per-stream instance, so check this one specifically. must not be
* null
* @param capability what to look for, per Hadoop Common's FileSystem docs
* @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
* implement it. return result of asking the stream otherwise.
*/
public static boolean hasCapability(FSDataOutputStream stream, String capability) {
// be consistent whether or not StreamCapabilities is present
if (stream == null) {
throw new NullPointerException("stream parameter must not be null.");
}
// If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
// otherwise old versions of Hadoop will break.
boolean result = true;
if (StreamCapabilities.PRESENT) {
// if StreamCapabilities is present, but the stream doesn't implement it
// or we run into a problem invoking the method,
// we treat that as equivalent to not declaring anything
result = false;
if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
try {
result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
} catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
exception) {
LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
"our understanding of how it's supposed to work. Please file a JIRA and include " +
"the following stack trace. In the mean time we're interpreting this behavior " +
"difference as a lack of capability support, which will probably cause a failure.",
exception);
}
}
}
return result;
}
/**
* Helper exception for those cases where the place where we need to check a stream capability
* is not where we have the needed context to explain the impact and mitigation for a lack.
*/
public static class StreamLacksCapabilityException extends Exception {
public StreamLacksCapabilityException(String message, Throwable cause) {
super(message, cause);
}
public StreamLacksCapabilityException(String message) {
super(message);
}
}
}

View File

@ -0,0 +1,164 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test {@link CommonFSUtils}.
*/
@Category({MiscTests.class, MediumTests.class})
public class TestCommonFSUtils {
private static final Log LOG = LogFactory.getLog(TestCommonFSUtils.class);
private HBaseCommonTestingUtility htu;
private Configuration conf;
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
conf = htu.getConfiguration();
}
/**
* Test path compare and prefix checking.
*/
@Test
public void testMatchingTail() throws IOException {
Path rootdir = htu.getDataTestDir();
final FileSystem fs = rootdir.getFileSystem(conf);
assertTrue(rootdir.depth() > 1);
Path partPath = new Path("a", "b");
Path fullPath = new Path(rootdir, partPath);
Path fullyQualifiedPath = fs.makeQualified(fullPath);
assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath));
assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath.toString()));
assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullPath.toString()));
assertTrue(CommonFSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString()));
assertFalse(CommonFSUtils.isStartingWithPath(rootdir, partPath.toString()));
assertFalse(CommonFSUtils.isMatchingTail(fullyQualifiedPath, partPath));
assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath));
assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString()));
assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath)));
assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString()));
assertFalse(CommonFSUtils.isMatchingTail(fullPath, new Path("x")));
assertFalse(CommonFSUtils.isMatchingTail(new Path("x"), fullPath));
}
private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
throws Exception {
FSDataOutputStream out = fs.create(file);
byte [] data = new byte[dataSize];
out.write(data, 0, dataSize);
out.close();
}
@Test
public void testSetWALRootDir() throws Exception {
Path p = new Path("file:///hbase/root");
CommonFSUtils.setWALRootDir(conf, p);
assertEquals(p.toString(), conf.get(CommonFSUtils.HBASE_WAL_DIR));
}
@Test
public void testGetWALRootDir() throws IOException {
Path root = new Path("file:///hbase/root");
Path walRoot = new Path("file:///hbase/logroot");
CommonFSUtils.setRootDir(conf, root);
assertEquals(CommonFSUtils.getRootDir(conf), root);
assertEquals(CommonFSUtils.getWALRootDir(conf), root);
CommonFSUtils.setWALRootDir(conf, walRoot);
assertEquals(CommonFSUtils.getWALRootDir(conf), walRoot);
}
@Test(expected=IllegalStateException.class)
public void testGetWALRootDirIllegalWALDir() throws IOException {
Path root = new Path("file:///hbase/root");
Path invalidWALDir = new Path("file:///hbase/root/logroot");
CommonFSUtils.setRootDir(conf, root);
CommonFSUtils.setWALRootDir(conf, invalidWALDir);
CommonFSUtils.getWALRootDir(conf);
}
@Test
public void testRemoveWALRootPath() throws Exception {
CommonFSUtils.setRootDir(conf, new Path("file:///user/hbase"));
Path testFile = new Path(CommonFSUtils.getRootDir(conf), "test/testfile");
Path tmpFile = new Path("file:///test/testfile");
assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), "test/testfile");
assertEquals(CommonFSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
CommonFSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), testFile.toString());
Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog");
assertEquals(CommonFSUtils.removeWALRootPath(logFile, conf), "test/testlog");
}
@Test(expected=NullPointerException.class)
public void streamCapabilitiesDoesNotAllowNullStream() {
CommonFSUtils.hasCapability(null, "hopefully any string");
}
private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
static {
boolean tmp = false;
try {
Class.forName("org.apache.hadoop.fs.StreamCapabilities");
tmp = true;
LOG.debug("Test thought StreamCapabilities class was present.");
} catch (ClassNotFoundException exception) {
LOG.debug("Test didn't think StreamCapabilities class was present.");
} finally {
STREAM_CAPABILITIES_IS_PRESENT = tmp;
}
}
@Test
public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException {
FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null);
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "hsync"));
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "hflush"));
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " +
"implement."));
}
}

View File

@ -46,18 +46,20 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/** /**
* WAL implementation of the ProcedureStore. * WAL implementation of the ProcedureStore.
@ -67,6 +69,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
public class WALProcedureStore extends ProcedureStoreBase { public class WALProcedureStore extends ProcedureStoreBase {
private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
public static final String LOG_PREFIX = "pv2-"; public static final String LOG_PREFIX = "pv2-";
/** Used to construct the name of the log directory for master procedures */
public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
public interface LeaseRecovery { public interface LeaseRecovery {
void recoverFileLease(FileSystem fs, Path path) throws IOException; void recoverFileLease(FileSystem fs, Path path) throws IOException;
@ -185,18 +190,42 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
} }
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
final LeaseRecovery leaseRecovery) { throws IOException {
this(conf, fs, walDir, null, leaseRecovery); this(conf,
new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
new Path(CommonFSUtils.getRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), leaseRecovery);
} }
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, @VisibleForTesting
final Path walArchiveDir, final LeaseRecovery leaseRecovery) { public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
this.fs = fs; final LeaseRecovery leaseRecovery) throws IOException {
this.conf = conf; this.conf = conf;
this.leaseRecovery = leaseRecovery;
this.walDir = walDir; this.walDir = walDir;
this.walArchiveDir = walArchiveDir; this.walArchiveDir = walArchiveDir;
this.leaseRecovery = leaseRecovery; this.fs = walDir.getFileSystem(conf);
// Create the log directory for the procedure store
if (!fs.exists(walDir)) {
if (!fs.mkdirs(walDir)) {
throw new IOException("Unable to mkdir " + walDir);
}
}
// Now that it exists, set the log policy
CommonFSUtils.setStoragePolicy(fs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
HConstants.DEFAULT_WAL_STORAGE_POLICY);
// Create archive dir up front. Rename won't work w/o it up on HDFS.
if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
if (this.fs.mkdirs(this.walArchiveDir)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir);
}
} else {
LOG.warn("Failed create of " + this.walArchiveDir);
}
}
} }
@Override @Override
@ -247,16 +276,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
} }
}; };
syncThread.start(); syncThread.start();
// Create archive dir up front. Rename won't work w/o it up on HDFS.
if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
if (this.fs.mkdirs(this.walArchiveDir)) {
if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " +
this.walArchiveDir);
} else {
LOG.warn("Failed create of " + this.walArchiveDir);
}
}
} }
@Override @Override
@ -1005,6 +1024,17 @@ public class WALProcedureStore extends ProcedureStoreBase {
LOG.warn("failed to create log file with id=" + logId, re); LOG.warn("failed to create log file with id=" + logId, re);
return false; return false;
} }
// After we create the stream but before we attempt to use it at all
// ensure that we can provide the level of data safety we're configured
// to provide.
final String durability = useHsync ? "hsync" : "hflush";
if (!(CommonFSUtils.hasCapability(newStream, durability))) {
throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
" for proper operation during component failures, but the underlying filesystem does " +
"not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
"' to set the desired level of robustness and ensure the config value of '" +
CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
}
try { try {
ProcedureWALFormat.writeHeader(newStream, header); ProcedureWALFormat.writeHeader(newStream, header);
startPos = newStream.getPos(); startPos = newStream.getPos();

View File

@ -51,14 +51,14 @@ public class ProcedureTestingUtility {
private ProcedureTestingUtility() { private ProcedureTestingUtility() {
} }
public static ProcedureStore createStore(final Configuration conf, final FileSystem fs, public static ProcedureStore createStore(final Configuration conf, final Path dir)
final Path baseDir) throws IOException { throws IOException {
return createWalStore(conf, fs, baseDir); return createWalStore(conf, dir);
} }
public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs, public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
final Path walDir) throws IOException { throws IOException {
return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() { return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
@Override @Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException { public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op // no-op

View File

@ -60,7 +60,7 @@ public class TestChildProcedures {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);

View File

@ -61,7 +61,7 @@ public class TestProcedureEvents {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procStore.start(1); procStore.start(1);
procExecutor.start(1, true); procExecutor.start(1, true);

View File

@ -63,7 +63,7 @@ public class TestProcedureExecution {
assertTrue(testDir.depth() > 1); assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);

View File

@ -66,7 +66,7 @@ public class TestProcedureMetrics {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);

View File

@ -67,7 +67,7 @@ public class TestProcedureNonce {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);

View File

@ -69,7 +69,7 @@ public class TestProcedureRecovery {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);

View File

@ -68,7 +68,7 @@ public class TestProcedureReplayOrder {
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcedureEnv(); procEnv = new TestProcedureEnv();
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procStore.start(NUM_THREADS); procStore.start(NUM_THREADS);
procExecutor.start(1, true); procExecutor.start(1, true);

View File

@ -76,7 +76,7 @@ public class TestStateMachineProcedure {
fs = testDir.getFileSystem(htu.getConfiguration()); fs = testDir.getFileSystem(htu.getConfiguration());
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);

View File

@ -65,7 +65,7 @@ public class TestYieldProcedures {
assertTrue(testDir.depth() > 1); assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procRunnables = new TestScheduler(); procRunnables = new TestScheduler();
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
procStore, procRunnables); procStore, procRunnables);

View File

@ -126,7 +126,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
Path logDir = new Path(testDir, "proc-logs"); Path logDir = new Path(testDir, "proc-logs");
System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
fs.delete(logDir, true); fs.delete(logDir, true);
store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); store = ProcedureTestingUtility.createWalStore(conf, logDir);
store.start(1); store.start(1);
store.recoverLease(); store.recoverLease();
store.load(new LoadCounter()); store.load(new LoadCounter());

View File

@ -93,9 +93,9 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
System.out.println("Logs directory : " + logDir.toString()); System.out.println("Logs directory : " + logDir.toString());
fs.delete(logDir, true); fs.delete(logDir, true);
if ("nosync".equals(syncType)) { if ("nosync".equals(syncType)) {
store = new NoSyncWalProcedureStore(conf, fs, logDir); store = new NoSyncWalProcedureStore(conf, logDir);
} else { } else {
store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); store = ProcedureTestingUtility.createWalStore(conf, logDir);
} }
store.start(numThreads); store.start(numThreads);
store.recoverLease(); store.recoverLease();
@ -244,9 +244,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
} }
private class NoSyncWalProcedureStore extends WALProcedureStore { private class NoSyncWalProcedureStore extends WALProcedureStore {
public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
final Path logDir) { super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
@Override @Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException { public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op // no-op

View File

@ -75,7 +75,7 @@ public class TestStressWALProcedureStore {
assertTrue(testDir.depth() > 1); assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procStore.start(PROCEDURE_STORE_SLOTS); procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease(); procStore.recoverLease();

View File

@ -86,7 +86,7 @@ public class TestWALProcedureStore {
setupConfig(htu.getConfiguration()); setupConfig(htu.getConfiguration());
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procStore.start(PROCEDURE_STORE_SLOTS); procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease(); procStore.recoverLease();
procStore.load(new LoadCounter()); procStore.load(new LoadCounter());
@ -729,7 +729,7 @@ public class TestWALProcedureStore {
assertEquals(procs.length + 1, status.length); assertEquals(procs.length + 1, status.length);
// simulate another active master removing the wals // simulate another active master removing the wals
procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir, procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
new WALProcedureStore.LeaseRecovery() { new WALProcedureStore.LeaseRecovery() {
private int count = 0; private int count = 0;

View File

@ -65,9 +65,6 @@ import edu.umd.cs.findbugs.annotations.Nullable;
public class HFileSystem extends FilterFileSystem { public class HFileSystem extends FilterFileSystem {
public static final Log LOG = LogFactory.getLog(HFileSystem.class); public static final Log LOG = LogFactory.getLog(HFileSystem.class);
/** Parameter name for HBase WAL directory */
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
private final FileSystem noChecksumFs; // read hfile data from storage private final FileSystem noChecksumFs; // read hfile data from storage
private final boolean useHBaseChecksum; private final boolean useHBaseChecksum;
private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -56,7 +57,8 @@ public final class AsyncFSOutputHelper {
*/ */
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoop eventLoop, boolean createParent, short replication, long blockSize, EventLoop eventLoop,
Class<? extends Channel> channelClass) throws IOException { Class<? extends Channel> channelClass)
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) { if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoop, channelClass); overwrite, createParent, replication, blockSize, eventLoop, channelClass);
@ -69,6 +71,13 @@ public final class AsyncFSOutputHelper {
} else { } else {
fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
} }
// After we create the stream but before we attempt to use it at all
// ensure that we can provide the level of data safety we're configured
// to provide.
if (!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
CommonFSUtils.hasCapability(fsOut, "hsync"))) {
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
}
final ExecutorService flushExecutor = final ExecutorService flushExecutor =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build()); .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());

View File

@ -53,7 +53,6 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.ClusterStatus.Option; import org.apache.hadoop.hbase.ClusterStatus.Option;
@ -1201,23 +1200,8 @@ public class HMaster extends HRegionServer implements MasterServices {
private void startProcedureExecutor() throws IOException { private void startProcedureExecutor() throws IOException {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
final Path rootDir = FSUtils.getRootDir(conf); final Path rootDir = FSUtils.getRootDir(conf);
final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
final Path walArchiveDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final FileSystem walFs = walDir.getFileSystem(conf); procedureStore = new WALProcedureStore(conf,
// Create the log directory for the procedure store
if (!walFs.exists(walDir)) {
if (!walFs.mkdirs(walDir)) {
throw new IOException("Unable to mkdir " + walDir);
}
}
// Now that it exists, set the log policy
FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
HConstants.DEFAULT_WAL_STORAGE_POLICY);
procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, walArchiveDir,
new MasterProcedureEnv.WALStoreLeaseRecovery(this)); new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();

View File

@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
@ -144,10 +144,10 @@ public class MasterFileSystem {
}; };
final String[] protectedSubLogDirs = new String[] { final String[] protectedSubLogDirs = new String[] {
HConstants.HREGION_LOGDIR_NAME, HConstants.HREGION_LOGDIR_NAME,
HConstants.HREGION_OLDLOGDIR_NAME, HConstants.HREGION_OLDLOGDIR_NAME,
HConstants.CORRUPT_DIR_NAME, HConstants.CORRUPT_DIR_NAME,
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR WALProcedureStore.MASTER_PROCEDURE_LOGDIR
}; };
// check if the root directory exists // check if the root directory exists
checkRootDir(this.rootdir, conf, this.fs); checkRootDir(this.rootdir, conf, this.fs);

View File

@ -24,9 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
public final class MasterProcedureConstants { public final class MasterProcedureConstants {
private MasterProcedureConstants() {} private MasterProcedureConstants() {}
/** Used to construct the name of the log directory for master procedures */
public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
/** Number of threads used by the procedure executor */ /** Number of threads used by the procedure executor */
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16; public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16;

View File

@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.DrainBarrier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -356,7 +356,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
// Now that it exists, set the storage policy for the entire directory of wal files related to // Now that it exists, set the storage policy for the entire directory of wal files related to
// this FSHLog instance // this FSHLog instance
FSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY, CommonFSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY,
HConstants.DEFAULT_WAL_STORAGE_POLICY); HConstants.DEFAULT_WAL_STORAGE_POLICY);
this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
@ -381,7 +381,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}; };
if (failIfWALExists) { if (failIfWALExists) {
final FileStatus[] walFiles = FSUtils.listStatus(fs, walDir, ourFiles); final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
if (null != walFiles && 0 != walFiles.length) { if (null != walFiles && 0 != walFiles.length) {
throw new IOException("Target WAL already exists within directory " + walDir); throw new IOException("Target WAL already exists within directory " + walDir);
} }
@ -398,7 +398,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
// (it costs a little x'ing bocks) // (it costs a little x'ing bocks)
final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
FSUtils.getDefaultBlockSize(this.fs, this.walDir)); CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir));
this.logrollsize = (long) (blocksize this.logrollsize = (long) (blocksize
* conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
@ -652,7 +652,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
} }
LOG.info("Archiving " + p + " to " + newPath); LOG.info("Archiving " + p + " to " + newPath);
if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
throw new IOException("Unable to rename " + p + " to " + newPath); throw new IOException("Unable to rename " + p + " to " + newPath);
} }
// Tell our listeners that a log has been archived. // Tell our listeners that a log has been archived.
@ -685,12 +685,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
try { try {
long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter); long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
int oldNumEntries = this.numEntries.getAndSet(0); int oldNumEntries = this.numEntries.getAndSet(0);
final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
if (oldPath != null) { if (oldPath != null) {
this.walFile2Props.put(oldPath, this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen); this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
+ ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString); + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
} else { } else {
LOG.info("New WAL " + newPathString); LOG.info("New WAL " + newPathString);
@ -767,6 +767,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
cleanOldLogs(); cleanOldLogs();
regionsToFlush = findRegionsToForceFlush(); regionsToFlush = findRegionsToForceFlush();
} }
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
// If the underlying FileSystem can't do what we ask, treat as IO failure so
// we'll abort.
throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " +
"for details.", exception);
} finally { } finally {
closeBarrier.endOp(); closeBarrier.endOp();
assert scope == NullScope.INSTANCE || !scope.isDetached(); assert scope == NullScope.INSTANCE || !scope.isDetached();
@ -794,7 +799,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* @return may be null if there are no files. * @return may be null if there are no files.
*/ */
protected FileStatus[] getFiles() throws IOException { protected FileStatus[] getFiles() throws IOException {
return FSUtils.listStatus(fs, walDir, ourFiles); return CommonFSUtils.listStatus(fs, walDir, ourFiles);
} }
@Override @Override
@ -833,7 +838,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
} }
if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
throw new IOException("Unable to rename " + file.getPath() + " to " + p); throw new IOException("Unable to rename " + file.getPath() + " to " + p);
} }
// Tell our listeners that a log was archived. // Tell our listeners that a log was archived.
@ -843,7 +848,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
} }
} }
} }
LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.walArchiveDir)); LOG.debug("Moved " + files.length + " WAL file(s) to " +
CommonFSUtils.getPath(this.walArchiveDir));
} }
LOG.info("Closed WAL: " + toString()); LOG.info("Closed WAL: " + toString());
} }
@ -1022,7 +1028,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
protected abstract W createWriterInstance(Path path) throws IOException; protected abstract W createWriterInstance(Path path) throws IOException,
CommonFSUtils.StreamLacksCapabilityException;
/** /**
* @return old wal file size * @return old wal file size

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -153,7 +154,7 @@ public abstract class AbstractProtobufLogWriter {
} }
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
throws IOException { throws IOException, StreamLacksCapabilityException {
this.conf = conf; this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path); boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
@ -237,7 +238,7 @@ public abstract class AbstractProtobufLogWriter {
} }
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException; short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
/** /**
* return the file length after written. * return the file length after written.

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -153,9 +154,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
@Override @Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException { short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoop, channelClass); blockSize, eventLoop, channelClass);
this.asyncOutputWrapper = new OutputStreamWrapper(output); this.asyncOutputWrapper = new OutputStreamWrapper(output);
} }

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -86,9 +88,13 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException { short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize,
null); null);
// TODO Be sure to add a check for hsync if this branch includes HBASE-19024
if (!(CommonFSUtils.hasCapability(output, "hflush"))) {
throw new StreamLacksCapabilityException("hflush");
}
} }
@Override @Override

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.primitives.Ints; import org.apache.hadoop.hbase.shaded.com.google.common.primitives.Ints;
import edu.umd.cs.findbugs.annotations.CheckForNull; import edu.umd.cs.findbugs.annotations.CheckForNull;
@ -36,8 +35,6 @@ import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -60,17 +57,14 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterId;
@ -101,174 +95,24 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
/** /**
* Utility methods for interacting with the underlying file system. * Utility methods for interacting with the underlying file system.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class FSUtils { public abstract class FSUtils extends CommonFSUtils {
private static final Log LOG = LogFactory.getLog(FSUtils.class); private static final Log LOG = LogFactory.getLog(FSUtils.class);
/** Full access permissions (starting point for a umask) */
public static final String FULL_RWX_PERMISSIONS = "777";
private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
private static final int DEFAULT_THREAD_POOLSIZE = 2; private static final int DEFAULT_THREAD_POOLSIZE = 2;
/** Set to true on Windows platforms */ /** Set to true on Windows platforms */
@VisibleForTesting // currently only used in testing. TODO refactor into a test class
public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
protected FSUtils() { protected FSUtils() {
super(); super();
} }
/**
* Sets storage policy for given path according to config setting.
* If the passed path is a directory, we'll set the storage policy for all files
* created in the future in said directory. Note that this change in storage
* policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
* If we're running on a version of HDFS that doesn't support the given storage policy
* (or storage policies at all), then we'll issue a log message and continue.
*
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
*
* @param fs We only do anything if an instance of DistributedFileSystem
* @param conf used to look up storage policy with given key; not modified.
* @param path the Path whose storage policy is to be set
* @param policyKey Key to use pulling a policy from Configuration:
* e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy).
* @param defaultPolicy usually should be the policy NONE to delegate to HDFS
*/
public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
final Path path, final String policyKey, final String defaultPolicy) {
String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
if (storagePolicy.equals(defaultPolicy)) {
if (LOG.isTraceEnabled()) {
LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
}
return;
}
setStoragePolicy(fs, path, storagePolicy);
}
private static final Map<FileSystem, Boolean> warningMap =
new ConcurrentHashMap<FileSystem, Boolean>();
/**
* Sets storage policy for given path.
* If the passed path is a directory, we'll set the storage policy for all files
* created in the future in said directory. Note that this change in storage
* policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
* If we're running on a version of HDFS that doesn't support the given storage policy
* (or storage policies at all), then we'll issue a log message and continue.
*
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
*
* @param fs We only do anything if an instance of DistributedFileSystem
* @param path the Path whose storage policy is to be set
* @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
* org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
* 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
*/
public static void setStoragePolicy(final FileSystem fs, final Path path,
final String storagePolicy) {
if (storagePolicy == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("We were passed a null storagePolicy, exiting early.");
}
return;
}
final String trimmedStoragePolicy = storagePolicy.trim();
if (trimmedStoragePolicy.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("We were passed an empty storagePolicy, exiting early.");
}
return;
}
boolean distributed = false;
try {
distributed = isDistributedFileSystem(fs);
} catch (IOException ioe) {
if (!warningMap.containsKey(fs)) {
warningMap.put(fs, true);
LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
+ "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
+ " on path=" + path);
} else if (LOG.isDebugEnabled()) {
LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
+ "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
+ " on path=" + path);
}
return;
}
if (distributed) {
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
}
}
/*
* All args have been checked and are good. Run the setStoragePolicy invocation.
*/
private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
final String storagePolicy) {
Method m = null;
try {
m = fs.getClass().getDeclaredMethod("setStoragePolicy",
new Class<?>[] { Path.class, String.class });
m.setAccessible(true);
} catch (NoSuchMethodException e) {
final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available";
if (!warningMap.containsKey(fs)) {
warningMap.put(fs, true);
LOG.warn(msg, e);
} else if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
}
m = null;
} catch (SecurityException e) {
final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available";
if (!warningMap.containsKey(fs)) {
warningMap.put(fs, true);
LOG.warn(msg, e);
} else if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
}
m = null; // could happen on setAccessible()
}
if (m != null) {
try {
m.invoke(fs, path, storagePolicy);
if (LOG.isDebugEnabled()) {
LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
}
} catch (Exception e) {
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
// misuse than a runtime problem with HDFS.
if (!warningMap.containsKey(fs)) {
warningMap.put(fs, true);
LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
}
// check for lack of HDFS-7228
if (e instanceof InvocationTargetException) {
final Throwable exception = e.getCause();
if (exception instanceof RemoteException &&
HadoopIllegalArgumentException.class.getName().equals(
((RemoteException)exception).getClassName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
}
}
}
}
}
}
/** /**
* @return True is <code>fs</code> is instance of DistributedFileSystem * @return True is <code>fs</code> is instance of DistributedFileSystem
* @throws IOException * @throws IOException
@ -283,32 +127,6 @@ public abstract class FSUtils {
return fileSystem instanceof DistributedFileSystem; return fileSystem instanceof DistributedFileSystem;
} }
/**
* Compare of path component. Does not consider schema; i.e. if schemas
* different but <code>path</code> starts with <code>rootPath</code>,
* then the function returns true
* @param rootPath
* @param path
* @return True if <code>path</code> starts with <code>rootPath</code>
*/
public static boolean isStartingWithPath(final Path rootPath, final String path) {
String uriRootPath = rootPath.toUri().getPath();
String tailUriPath = (new Path(path)).toUri().getPath();
return tailUriPath.startsWith(uriRootPath);
}
/**
* Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
* '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
* the two will equate.
* @param pathToSearch Path we will be trying to match.
* @param pathTail
* @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
*/
public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
return isMatchingTail(pathToSearch, new Path(pathTail));
}
/** /**
* Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
* '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
@ -352,18 +170,6 @@ public abstract class FSUtils {
return fsUtils; return fsUtils;
} }
/**
* Delete if exists.
* @param fs filesystem object
* @param dir directory to delete
* @return True if deleted <code>dir</code>
* @throws IOException e
*/
public static boolean deleteDirectory(final FileSystem fs, final Path dir)
throws IOException {
return fs.exists(dir) && fs.delete(dir, true);
}
/** /**
* Delete the region directory if exists. * Delete the region directory if exists.
* @param conf * @param conf
@ -379,89 +185,7 @@ public abstract class FSUtils {
new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
} }
/** /**
* Return the number of bytes that large input files should be optimally
* be split into to minimize i/o time.
*
* use reflection to search for getDefaultBlockSize(Path f)
* if the method doesn't exist, fall back to using getDefaultBlockSize()
*
* @param fs filesystem object
* @return the default block size for the path's filesystem
* @throws IOException e
*/
public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
Method m = null;
Class<? extends FileSystem> cls = fs.getClass();
try {
m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
} catch (NoSuchMethodException e) {
LOG.info("FileSystem doesn't support getDefaultBlockSize");
} catch (SecurityException e) {
LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
m = null; // could happen on setAccessible()
}
if (m == null) {
return fs.getDefaultBlockSize(path);
} else {
try {
Object ret = m.invoke(fs, path);
return ((Long)ret).longValue();
} catch (Exception e) {
throw new IOException(e);
}
}
}
/*
* Get the default replication.
*
* use reflection to search for getDefaultReplication(Path f)
* if the method doesn't exist, fall back to using getDefaultReplication()
*
* @param fs filesystem object
* @param f path of file
* @return default replication for the path's filesystem
* @throws IOException e
*/
public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
Method m = null;
Class<? extends FileSystem> cls = fs.getClass();
try {
m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
} catch (NoSuchMethodException e) {
LOG.info("FileSystem doesn't support getDefaultReplication");
} catch (SecurityException e) {
LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
m = null; // could happen on setAccessible()
}
if (m == null) {
return fs.getDefaultReplication(path);
} else {
try {
Object ret = m.invoke(fs, path);
return ((Number)ret).shortValue();
} catch (Exception e) {
throw new IOException(e);
}
}
}
/**
* Returns the default buffer size to use during writes.
*
* The size of the buffer should probably be a multiple of hardware
* page size (4096 on Intel x86), and it determines how much data is
* buffered during read and write operations.
*
* @param fs filesystem object
* @return default buffer size to use during writes
*/
public static int getDefaultBufferSize(final FileSystem fs) {
return fs.getConf().getInt("io.file.buffer.size", 4096);
}
/**
* Create the specified file on the filesystem. By default, this will: * Create the specified file on the filesystem. By default, this will:
* <ol> * <ol>
* <li>overwrite the file if it exists</li> * <li>overwrite the file if it exists</li>
@ -514,71 +238,6 @@ public abstract class FSUtils {
return create(fs, path, perm, true); return create(fs, path, perm, true);
} }
/**
* Create the specified file on the filesystem. By default, this will:
* <ol>
* <li>apply the umask in the configuration (if it is enabled)</li>
* <li>use the fs configured buffer size (or 4096 if not set)</li>
* <li>use the default replication</li>
* <li>use the default block size</li>
* <li>not track progress</li>
* </ol>
*
* @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write
* @param perm
* @param overwrite Whether or not the created file should be overwritten.
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
public static FSDataOutputStream create(FileSystem fs, Path path,
FsPermission perm, boolean overwrite) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
}
return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
}
/**
* Get the file permissions specified in the configuration, if they are
* enabled.
*
* @param fs filesystem that the file will be created on.
* @param conf configuration to read for determining if permissions are
* enabled and which to use
* @param permssionConfKey property key in the configuration to use when
* finding the permission
* @return the permission to use when creating a new file on the fs. If
* special permissions are not specified in the configuration, then
* the default permissions on the the fs will be returned.
*/
public static FsPermission getFilePermissions(final FileSystem fs,
final Configuration conf, final String permssionConfKey) {
boolean enablePermissions = conf.getBoolean(
HConstants.ENABLE_DATA_FILE_UMASK, false);
if (enablePermissions) {
try {
FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
// make sure that we have a mask, if not, go default.
String mask = conf.get(permssionConfKey);
if (mask == null)
return FsPermission.getFileDefault();
// appy the umask
FsPermission umask = new FsPermission(mask);
return perm.applyUMask(umask);
} catch (IllegalArgumentException e) {
LOG.warn(
"Incorrect umask attempted to be created: "
+ conf.get(permssionConfKey)
+ ", using default file permissions.", e);
return FsPermission.getFileDefault();
}
}
return FsPermission.getFileDefault();
}
/** /**
* Checks to see if the specified file system is available * Checks to see if the specified file system is available
* *
@ -1022,46 +681,6 @@ public abstract class FSUtils {
} }
} }
/**
* Verifies root directory path is a valid URI with a scheme
*
* @param root root directory path
* @return Passed <code>root</code> argument.
* @throws IOException if not a valid URI with a scheme
*/
public static Path validateRootPath(Path root) throws IOException {
try {
URI rootURI = new URI(root.toString());
String scheme = rootURI.getScheme();
if (scheme == null) {
throw new IOException("Root directory does not have a scheme");
}
return root;
} catch (URISyntaxException e) {
IOException io = new IOException("Root directory path is not a valid " +
"URI -- check your " + HBASE_DIR + " configuration");
io.initCause(e);
throw io;
}
}
/**
* Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If
* it exists, this method removes it and returns the String representation of remaining relative path.
* @param path
* @param conf
* @return String representation of the remaining relative path
* @throws IOException
*/
public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
Path root = getWALRootDir(conf);
String pathStr = path.toString();
// check that the path is absolute... it has the root path in it.
if (!pathStr.startsWith(root.toString())) return pathStr;
// if not, return as it is.
return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
}
/** /**
* If DFS, check safe mode and if so, wait until we clear it. * If DFS, check safe mode and if so, wait until we clear it.
* @param conf configuration * @param conf configuration
@ -1085,81 +704,6 @@ public abstract class FSUtils {
} }
} }
/**
* Return the 'path' component of a Path. In Hadoop, Path is an URI. This
* method returns the 'path' component of a Path's URI: e.g. If a Path is
* <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
* this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
* This method is useful if you want to print out a Path without qualifying
* Filesystem instance.
* @param p Filesystem Path whose 'path' component we are to return.
* @return Path portion of the Filesystem
*/
public static String getPath(Path p) {
return p.toUri().getPath();
}
/**
* @param c configuration
* @return {@link Path} to hbase root directory: i.e. {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} from
* configuration as a qualified Path.
* @throws IOException e
*/
public static Path getRootDir(final Configuration c) throws IOException {
Path p = new Path(c.get(HBASE_DIR));
FileSystem fs = p.getFileSystem(c);
return p.makeQualified(fs);
}
public static void setRootDir(final Configuration c, final Path root) throws IOException {
c.set(HBASE_DIR, root.toString());
}
public static void setFsDefault(final Configuration c, final Path root) throws IOException {
c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
}
public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
Path p = getRootDir(c);
return p.getFileSystem(c);
}
/**
* @param c configuration
* @return {@link Path} to hbase log root directory: i.e. {@value org.apache.hadoop.hbase.fs.HFileSystem#HBASE_WAL_DIR} from
* configuration as a qualified Path. Defaults to {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR}
* @throws IOException e
*/
public static Path getWALRootDir(final Configuration c) throws IOException {
Path p = new Path(c.get(HFileSystem.HBASE_WAL_DIR, c.get(HBASE_DIR)));
if (!isValidWALRootDir(p, c)) {
return FSUtils.getRootDir(c);
}
FileSystem fs = p.getFileSystem(c);
return p.makeQualified(fs);
}
@VisibleForTesting
public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
c.set(HFileSystem.HBASE_WAL_DIR, root.toString());
}
public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
Path p = getWALRootDir(c);
return p.getFileSystem(c);
}
private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
Path rootDir = FSUtils.getRootDir(c);
if (walDir != rootDir) {
if (walDir.toString().startsWith(rootDir.toString() + "/")) {
throw new IllegalStateException("Illegal WAL directory specified. " +
"WAL directories are not permitted to be under the root directory if set.");
}
}
return true;
}
/** /**
* Checks if meta region exists * Checks if meta region exists
* *
@ -1297,44 +841,6 @@ public abstract class FSUtils {
return frags; return frags;
} }
/**
* Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
* path rootdir
*
* @param rootdir qualified path of HBase root directory
* @param tableName name of table
* @return {@link org.apache.hadoop.fs.Path} for table
*/
public static Path getTableDir(Path rootdir, final TableName tableName) {
return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
tableName.getQualifierAsString());
}
/**
* Returns the {@link org.apache.hadoop.hbase.TableName} object representing
* the table directory under
* path rootdir
*
* @param tablePath path of table
* @return {@link org.apache.hadoop.fs.Path} for table
*/
public static TableName getTableName(Path tablePath) {
return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
}
/**
* Returns the {@link org.apache.hadoop.fs.Path} object representing
* the namespace directory under path rootdir
*
* @param rootdir qualified path of HBase root directory
* @param namespace namespace name
* @return {@link org.apache.hadoop.fs.Path} for table
*/
public static Path getNamespaceDir(Path rootdir, final String namespace) {
return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
new Path(namespace)));
}
/** /**
* A {@link PathFilter} that returns only regular files. * A {@link PathFilter} that returns only regular files.
*/ */
@ -1431,17 +937,6 @@ public abstract class FSUtils {
} }
} }
/**
* @param conf
* @return True if this filesystem whose scheme is 'hdfs'.
* @throws IOException
*/
public static boolean isHDFS(final Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
String scheme = fs.getUri().getScheme();
return scheme.equalsIgnoreCase("hdfs");
}
/** /**
* Recover file lease. Used when a file might be suspect * Recover file lease. Used when a file might be suspect
* to be had been left open by another process. * to be had been left open by another process.
@ -1483,15 +978,6 @@ public abstract class FSUtils {
return tabledirs; return tabledirs;
} }
/**
* Checks if the given path is the one with 'recovered.edits' dir.
* @param path
* @return True if we recovered edits
*/
public static boolean isRecoveredEdits(Path path) {
return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
}
/** /**
* Filter for all dirs that don't start with '.' * Filter for all dirs that don't start with '.'
*/ */
@ -1668,18 +1154,6 @@ public abstract class FSUtils {
} }
} }
/**
* @param conf
* @return Returns the filesystem of the hbase rootdir.
* @throws IOException
*/
public static FileSystem getCurrentFileSystem(Configuration conf)
throws IOException {
return getRootDir(conf).getFileSystem(conf);
}
/** /**
* Runs through the HBase rootdir/tablename and creates a reverse lookup map for * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
* table StoreFile names to the full Path. * table StoreFile names to the full Path.
@ -1977,101 +1451,6 @@ public abstract class FSUtils {
} }
} }
/**
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
* This accommodates differences between hadoop versions, where hadoop 1
* does not throw a FileNotFoundException, and return an empty FileStatus[]
* while Hadoop 2 will throw FileNotFoundException.
*
* Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem,
* Path, FileStatusFilter)} instead.
*
* @param fs file system
* @param dir directory
* @param filter path filter
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
*/
public static FileStatus [] listStatus(final FileSystem fs,
final Path dir, final PathFilter filter) throws IOException {
FileStatus [] status = null;
try {
status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
} catch (FileNotFoundException fnfe) {
// if directory doesn't exist, return null
if (LOG.isTraceEnabled()) {
LOG.trace(dir + " doesn't exist");
}
}
if (status == null || status.length < 1) return null;
return status;
}
/**
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
* This would accommodates differences between hadoop versions
*
* @param fs file system
* @param dir directory
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
*/
public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
return listStatus(fs, dir, null);
}
/**
* Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
*
* @param fs file system
* @param dir directory
* @return LocatedFileStatus list
*/
public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
final Path dir) throws IOException {
List<LocatedFileStatus> status = null;
try {
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
.listFiles(dir, false);
while (locatedFileStatusRemoteIterator.hasNext()) {
if (status == null) {
status = Lists.newArrayList();
}
status.add(locatedFileStatusRemoteIterator.next());
}
} catch (FileNotFoundException fnfe) {
// if directory doesn't exist, return null
if (LOG.isTraceEnabled()) {
LOG.trace(dir + " doesn't exist");
}
}
return status;
}
/**
* Calls fs.delete() and returns the value returned by the fs.delete()
*
* @param fs
* @param path
* @param recursive
* @return the value returned by the fs.delete()
* @throws IOException
*/
public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
throws IOException {
return fs.delete(path, recursive);
}
/**
* Calls fs.exists(). Checks if the specified path exists
*
* @param fs
* @param path
* @return the value returned by fs.exists()
* @throws IOException
*/
public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
return fs.exists(path);
}
/** /**
* Throw an exception if an action is not permitted by a user on a file. * Throw an exception if an action is not permitted by a user on a file.
* *
@ -2108,46 +1487,6 @@ public abstract class FSUtils {
return false; return false;
} }
/**
* Log the current state of the filesystem from a certain root directory
* @param fs filesystem to investigate
* @param root root file/directory to start logging from
* @param LOG log to output information
* @throws IOException if an unexpected exception occurs
*/
public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
throws IOException {
LOG.debug("Current file system:");
logFSTree(LOG, fs, root, "|-");
}
/**
* Recursive helper to log the state of the FS
*
* @see #logFileSystemState(FileSystem, Path, Log)
*/
private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
throws IOException {
FileStatus[] files = FSUtils.listStatus(fs, root, null);
if (files == null) return;
for (FileStatus file : files) {
if (file.isDirectory()) {
LOG.debug(prefix + file.getPath().getName() + "/");
logFSTree(LOG, fs, file.getPath(), prefix + "---");
} else {
LOG.debug(prefix + file.getPath().getName());
}
}
}
public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
throws IOException {
// set the modify time for TimeToLive Cleaner
fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
return fs.rename(src, dest);
}
/** /**
* This function is to scan the root path of the file system to get the * This function is to scan the root path of the file system to get the
* degree of locality for each region on each of the servers having at least * degree of locality for each region on each of the servers having at least
@ -2397,4 +1736,5 @@ public abstract class FSUtils {
return null; return null;
} }
} }
} }

View File

@ -38,7 +38,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
/** /**
@ -52,7 +52,13 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
// Only public so classes back in regionserver.wal can access // Only public so classes back in regionserver.wal can access
public interface AsyncWriter extends WALProvider.AsyncWriter { public interface AsyncWriter extends WALProvider.AsyncWriter {
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; /**
* @throws IOException if something goes wrong initializing an output stream
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
* meet the needs of the given Writer implementation.
*/
void init(FileSystem fs, Path path, Configuration c, boolean overwritable)
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
} }
private EventLoopGroup eventLoopGroup; private EventLoopGroup eventLoopGroup;
@ -60,7 +66,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
private Class<? extends Channel> channelClass; private Class<? extends Channel> channelClass;
@Override @Override
protected AsyncFSWAL createWAL() throws IOException { protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
@ -96,7 +102,15 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
writer.init(fs, path, conf, overwritable); writer.init(fs, path, conf, overwritable);
return writer; return writer;
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Error instantiating log writer.", e); if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
LOG.error("The RegionServer async write ahead log provider " +
"relies on the ability to call " + e.getMessage() + " for proper operation during " +
"component failures, but the current FileSystem does not support doing so. Please " +
"check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " +
"it points to a FileSystem mount that has suitable capabilities for output streams.");
} else {
LOG.debug("Error instantiating log writer.", e);
}
Throwables.propagateIfPossible(e, IOException.class); Throwables.propagateIfPossible(e, IOException.class);
throw new IOException("cannot get log writer", e); throw new IOException("cannot get log writer", e);
} }

View File

@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceStability;
// imports for things that haven't moved from regionserver.wal yet. // imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
/** /**
* A WAL provider that use {@link FSHLog}. * A WAL provider that use {@link FSHLog}.
@ -44,7 +44,13 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
// Only public so classes back in regionserver.wal can access // Only public so classes back in regionserver.wal can access
public interface Writer extends WALProvider.Writer { public interface Writer extends WALProvider.Writer {
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; /**
* @throws IOException if something goes wrong initializing an output stream
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
* meet the needs of the given Writer implementation.
*/
void init(FileSystem fs, Path path, Configuration c, boolean overwritable)
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
} }
/** /**
@ -61,7 +67,15 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
writer.init(fs, path, conf, overwritable); writer.init(fs, path, conf, overwritable);
return writer; return writer;
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Error instantiating log writer.", e); if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
LOG.error("The RegionServer write ahead log provider for FileSystem implementations " +
"relies on the ability to call " + e.getMessage() + " for proper operation during " +
"component failures, but the current FileSystem does not support doing so. Please " +
"check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " +
"it points to a FileSystem mount that has suitable capabilities for output streams.");
} else {
LOG.debug("Error instantiating log writer.", e);
}
if (writer != null) { if (writer != null) {
try{ try{
writer.close(); writer.close();
@ -75,7 +89,7 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
@Override @Override
protected FSHLog createWAL() throws IOException { protected FSHLog createWAL() throws IOException {
return new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -55,7 +56,8 @@ public class TestLocalAsyncOutput {
} }
@Test @Test
public void test() throws IOException, InterruptedException, ExecutionException { public void test() throws IOException, InterruptedException, ExecutionException,
FSUtils.StreamLacksCapabilityException {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@ -184,10 +185,8 @@ public class MockMasterServices extends MockNoopMasterServices {
throws IOException { throws IOException {
final Configuration conf = getConfiguration(); final Configuration conf = getConfiguration();
final Path logDir = new Path(fileSystemManager.getRootDir(), final Path logDir = new Path(fileSystemManager.getRootDir(),
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
//procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
// new MasterProcedureEnv.WALStoreLeaseRecovery(this));
this.procedureStore = new NoopProcedureStore(); this.procedureStore = new NoopProcedureStore();
this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));

View File

@ -115,8 +115,8 @@ public class TestMasterProcedureWalLease {
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(), final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
firstMaster.getMasterFileSystem().getFileSystem(),
((WALProcedureStore)masterStore).getWALDir(), ((WALProcedureStore)masterStore).getWALDir(),
null,
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
// Abort Latch for the test store // Abort Latch for the test store
final CountDownLatch backupStore3Abort = new CountDownLatch(1); final CountDownLatch backupStore3Abort = new CountDownLatch(1);
@ -195,8 +195,8 @@ public class TestMasterProcedureWalLease {
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(), final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
firstMaster.getMasterFileSystem().getFileSystem(),
((WALProcedureStore)procStore).getWALDir(), ((WALProcedureStore)procStore).getWALDir(),
null,
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
// start a second store which should fence the first one out // start a second store which should fence the first one out

View File

@ -76,7 +76,7 @@ public class TestWALProcedureStoreOnHDFS {
MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir); store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
store.registerListener(stopProcedureListener); store.registerListener(stopProcedureListener);
store.start(8); store.start(8);
store.recoverLease(); store.recoverLease();

View File

@ -58,9 +58,9 @@ import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -139,8 +139,8 @@ public abstract class AbstractTestFSWAL {
// test to see whether the coprocessor is loaded or not. // test to see whether the coprocessor is loaded or not.
AbstractFSWAL<?> wal = null; AbstractFSWAL<?> wal = null;
try { try {
wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME, wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
CONF, null, true, null, null); HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
WALCoprocessorHost host = wal.getCoprocessorHost(); WALCoprocessorHost host = wal.getCoprocessorHost();
Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
assertNotNull(c); assertNotNull(c);
@ -187,8 +187,8 @@ public abstract class AbstractTestFSWAL {
AbstractFSWAL<?> wal1 = null; AbstractFSWAL<?> wal1 = null;
AbstractFSWAL<?> walMeta = null; AbstractFSWAL<?> walMeta = null;
try { try {
wal1 = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME, wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
CONF, null, true, null, null); HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
LOG.debug("Log obtained is: " + wal1); LOG.debug("Log obtained is: " + wal1);
Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR; Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
Path p1 = wal1.computeFilename(11); Path p1 = wal1.computeFilename(11);
@ -197,9 +197,9 @@ public abstract class AbstractTestFSWAL {
assertTrue(comp.compare(p1, p1) == 0); assertTrue(comp.compare(p1, p1) == 0);
// comparing with different filenum. // comparing with different filenum.
assertTrue(comp.compare(p1, p2) < 0); assertTrue(comp.compare(p1, p2) < 0);
walMeta = walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME, HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null,
CONF, null, true, null, AbstractFSWALProvider.META_WAL_PROVIDER_ID); AbstractFSWALProvider.META_WAL_PROVIDER_ID);
Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR; Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
Path p1WithMeta = walMeta.computeFilename(11); Path p1WithMeta = walMeta.computeFilename(11);
@ -245,7 +245,7 @@ public abstract class AbstractTestFSWAL {
LOG.debug("testFindMemStoresEligibleForFlush"); LOG.debug("testFindMemStoresEligibleForFlush");
Configuration conf1 = HBaseConfiguration.create(CONF); Configuration conf1 = HBaseConfiguration.create(CONF);
conf1.setInt("hbase.regionserver.maxlogs", 1); conf1.setInt("hbase.regionserver.maxlogs", 1);
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(conf1), DIR.toString(), AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
HTableDescriptor t1 = HTableDescriptor t1 =
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
@ -332,9 +332,10 @@ public abstract class AbstractTestFSWAL {
} }
@Test(expected = IOException.class) @Test(expected = IOException.class)
public void testFailedToCreateWALIfParentRenamed() throws IOException { public void testFailedToCreateWALIfParentRenamed() throws IOException,
CommonFSUtils.StreamLacksCapabilityException {
final String name = "testFailedToCreateWALIfParentRenamed"; final String name = "testFailedToCreateWALIfParentRenamed";
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(CONF), name, AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
long filenum = System.currentTimeMillis(); long filenum = System.currentTimeMillis();
Path path = wal.computeFilename(filenum); Path path = wal.computeFilename(filenum);
@ -373,17 +374,17 @@ public abstract class AbstractTestFSWAL {
scopes.put(fam, 0); scopes.put(fam, 0);
} }
// subclass and doctor a method. // subclass and doctor a method.
AbstractFSWAL<?> wal = newSlowWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF, AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
null, true, null, null, new Runnable() { testName, CONF, null, true, null, null, new Runnable() {
@Override @Override
public void run() { public void run() {
if (goslow.get()) { if (goslow.get()) {
Threads.sleep(100); Threads.sleep(100);
LOG.debug("Sleeping before appending 100ms"); LOG.debug("Sleeping before appending 100ms");
}
} }
} });
});
HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
@ -434,8 +435,8 @@ public abstract class AbstractTestFSWAL {
@Test @Test
public void testSyncNoAppend() throws IOException { public void testSyncNoAppend() throws IOException {
String testName = currentTest.getMethodName(); String testName = currentTest.getMethodName();
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF, AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
null, true, null, null); CONF, null, true, null, null);
try { try {
wal.sync(); wal.sync();
} finally { } finally {

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -1029,7 +1030,8 @@ public abstract class AbstractTestWALReplay {
/** /**
* testcase for https://issues.apache.org/jira/browse/HBASE-14949. * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
*/ */
private void testNameConflictWhenSplit(boolean largeFirst) throws IOException { private void testNameConflictWhenSplit(boolean largeFirst) throws IOException,
StreamLacksCapabilityException {
final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
@ -1071,12 +1073,12 @@ public abstract class AbstractTestWALReplay {
} }
@Test @Test
public void testNameConflictWhenSplit0() throws IOException { public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
testNameConflictWhenSplit(true); testNameConflictWhenSplit(true);
} }
@Test @Test
public void testNameConflictWhenSplit1() throws IOException { public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
testNameConflictWhenSplit(false); testNameConflictWhenSplit(false);
} }
@ -1231,7 +1233,8 @@ public abstract class AbstractTestWALReplay {
return htd; return htd;
} }
private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException { private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException,
StreamLacksCapabilityException {
fs.mkdirs(file.getParent()); fs.mkdirs(file.getParent());
ProtobufLogWriter writer = new ProtobufLogWriter(); ProtobufLogWriter writer = new ProtobufLogWriter();
writer.init(fs, file, conf, true); writer.init(fs, file, conf, true);

View File

@ -30,6 +30,8 @@ import java.io.IOException;
import java.util.Random; import java.util.Random;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -41,7 +43,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -57,6 +58,7 @@ import org.junit.experimental.categories.Category;
*/ */
@Category({MiscTests.class, MediumTests.class}) @Category({MiscTests.class, MediumTests.class})
public class TestFSUtils { public class TestFSUtils {
private static final Log LOG = LogFactory.getLog(TestFSUtils.class);
private HBaseTestingUtility htu; private HBaseTestingUtility htu;
private FileSystem fs; private FileSystem fs;
@ -69,53 +71,6 @@ public class TestFSUtils {
conf = htu.getConfiguration(); conf = htu.getConfiguration();
} }
/**
* Test path compare and prefix checking.
* @throws IOException
*/
@Test
public void testMatchingTail() throws IOException {
Path rootdir = htu.getDataTestDir();
assertTrue(rootdir.depth() > 1);
Path partPath = new Path("a", "b");
Path fullPath = new Path(rootdir, partPath);
Path fullyQualifiedPath = fs.makeQualified(fullPath);
assertFalse(FSUtils.isMatchingTail(fullPath, partPath));
assertFalse(FSUtils.isMatchingTail(fullPath, partPath.toString()));
assertTrue(FSUtils.isStartingWithPath(rootdir, fullPath.toString()));
assertTrue(FSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString()));
assertFalse(FSUtils.isStartingWithPath(rootdir, partPath.toString()));
assertFalse(FSUtils.isMatchingTail(fullyQualifiedPath, partPath));
assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath));
assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString()));
assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath)));
assertTrue(FSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString()));
assertFalse(FSUtils.isMatchingTail(fullPath, new Path("x")));
assertFalse(FSUtils.isMatchingTail(new Path("x"), fullPath));
}
@Test
public void testVersion() throws DeserializationException, IOException {
final Path rootdir = htu.getDataTestDir();
assertNull(FSUtils.getVersion(fs, rootdir));
// Write out old format version file. See if we can read it in and convert.
Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
FSDataOutputStream s = fs.create(versionFile);
final String version = HConstants.FILE_SYSTEM_VERSION;
s.writeUTF(version);
s.close();
assertTrue(fs.exists(versionFile));
FileStatus [] status = fs.listStatus(versionFile);
assertNotNull(status);
assertTrue(status.length > 0);
String newVersion = FSUtils.getVersion(fs, rootdir);
assertEquals(version.length(), newVersion.length());
assertEquals(version, newVersion);
// File will have been converted. Exercise the pb format
assertEquals(version, FSUtils.getVersion(fs, rootdir));
FSUtils.checkVersion(fs, rootdir, true);
}
@Test public void testIsHDFS() throws Exception { @Test public void testIsHDFS() throws Exception {
assertFalse(FSUtils.isHDFS(conf)); assertFalse(FSUtils.isHDFS(conf));
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
@ -238,8 +193,33 @@ public class TestFSUtils {
} }
} }
@Test
public void testVersion() throws DeserializationException, IOException {
final Path rootdir = htu.getDataTestDir();
final FileSystem fs = rootdir.getFileSystem(conf);
assertNull(FSUtils.getVersion(fs, rootdir));
// Write out old format version file. See if we can read it in and convert.
Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
FSDataOutputStream s = fs.create(versionFile);
final String version = HConstants.FILE_SYSTEM_VERSION;
s.writeUTF(version);
s.close();
assertTrue(fs.exists(versionFile));
FileStatus [] status = fs.listStatus(versionFile);
assertNotNull(status);
assertTrue(status.length > 0);
String newVersion = FSUtils.getVersion(fs, rootdir);
assertEquals(version.length(), newVersion.length());
assertEquals(version, newVersion);
// File will have been converted. Exercise the pb format
assertEquals(version, FSUtils.getVersion(fs, rootdir));
FSUtils.checkVersion(fs, rootdir, true);
}
@Test @Test
public void testPermMask() throws Exception { public void testPermMask() throws Exception {
final Path rootdir = htu.getDataTestDir();
final FileSystem fs = rootdir.getFileSystem(conf);
// default fs permission // default fs permission
FsPermission defaultFsPerm = FSUtils.getFilePermissions(fs, conf, FsPermission defaultFsPerm = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY); HConstants.DATA_FILE_UMASK_KEY);
@ -277,6 +257,8 @@ public class TestFSUtils {
@Test @Test
public void testDeleteAndExists() throws Exception { public void testDeleteAndExists() throws Exception {
final Path rootdir = htu.getDataTestDir();
final FileSystem fs = rootdir.getFileSystem(conf);
conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// then that the correct file is created // then that the correct file is created
@ -302,6 +284,7 @@ public class TestFSUtils {
} }
} }
@Test @Test
public void testRenameAndSetModifyTime() throws Exception { public void testRenameAndSetModifyTime() throws Exception {
MiniDFSCluster cluster = htu.startMiniDFSCluster(1); MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
@ -338,6 +321,24 @@ public class TestFSUtils {
} }
} }
@Test
public void testSetStoragePolicyDefault() throws Exception {
verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
}
/* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
@Test
public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
verifyFileInDirWithStoragePolicy("ALL_SSD");
}
/* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
@Test
public void testSetStoragePolicyInvalid() throws Exception {
verifyFileInDirWithStoragePolicy("1772");
}
// Here instead of TestCommonFSUtils because we need a minicluster
private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception { private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
conf.set(HConstants.WAL_STORAGE_POLICY, policy); conf.set(HConstants.WAL_STORAGE_POLICY, policy);
@ -362,63 +363,6 @@ public class TestFSUtils {
} }
} }
@Test
public void testSetStoragePolicyDefault() throws Exception {
verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
}
/* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
@Test
public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
verifyFileInDirWithStoragePolicy("ALL_SSD");
}
/* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
@Test
public void testSetStoragePolicyInvalid() throws Exception {
verifyFileInDirWithStoragePolicy("1772");
}
@Test
public void testSetWALRootDir() throws Exception {
Path p = new Path("file:///hbase/root");
FSUtils.setWALRootDir(conf, p);
assertEquals(p.toString(), conf.get(HFileSystem.HBASE_WAL_DIR));
}
@Test
public void testGetWALRootDir() throws IOException {
Path root = new Path("file:///hbase/root");
Path walRoot = new Path("file:///hbase/logroot");
FSUtils.setRootDir(conf, root);
assertEquals(FSUtils.getRootDir(conf), root);
assertEquals(FSUtils.getWALRootDir(conf), root);
FSUtils.setWALRootDir(conf, walRoot);
assertEquals(FSUtils.getWALRootDir(conf), walRoot);
}
@Test(expected=IllegalStateException.class)
public void testGetWALRootDirIllegalWALDir() throws IOException {
Path root = new Path("file:///hbase/root");
Path invalidWALDir = new Path("file:///hbase/root/logroot");
FSUtils.setRootDir(conf, root);
FSUtils.setWALRootDir(conf, invalidWALDir);
FSUtils.getWALRootDir(conf);
}
@Test
public void testRemoveWALRootPath() throws Exception {
FSUtils.setRootDir(conf, new Path("file:///user/hbase"));
Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile");
Path tmpFile = new Path("file:///test/testfile");
assertEquals(FSUtils.removeWALRootPath(testFile, conf), "test/testfile");
assertEquals(FSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
FSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
assertEquals(FSUtils.removeWALRootPath(testFile, conf), testFile.toString());
Path logFile = new Path(FSUtils.getWALRootDir(conf), "test/testlog");
assertEquals(FSUtils.removeWALRootPath(logFile, conf), "test/testlog");
}
/** /**
* Ugly test that ensures we can get at the hedged read counters in dfsclient. * Ugly test that ensures we can get at the hedged read counters in dfsclient.
* Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread. * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
@ -565,4 +509,37 @@ public class TestFSUtils {
assertTrue(fileSys.delete(name, true)); assertTrue(fileSys.delete(name, true));
assertTrue(!fileSys.exists(name)); assertTrue(!fileSys.exists(name));
} }
private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
static {
boolean tmp = false;
try {
Class.forName("org.apache.hadoop.fs.StreamCapabilities");
tmp = true;
LOG.debug("Test thought StreamCapabilities class was present.");
} catch (ClassNotFoundException exception) {
LOG.debug("Test didn't think StreamCapabilities class was present.");
} finally {
STREAM_CAPABILITIES_IS_PRESENT = tmp;
}
}
// Here instead of TestCommonFSUtils because we need a minicluster
@Test
public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
try (FileSystem filesystem = cluster.getFileSystem()) {
FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
assertTrue(FSUtils.hasCapability(stream, "hsync"));
assertTrue(FSUtils.hasCapability(stream, "hflush"));
assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " +
"StreamCapabilities class is not defined.",
STREAM_CAPABILITIES_IS_PRESENT,
FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add."));
} finally {
cluster.shutdown();
}
}
} }

View File

@ -38,7 +38,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
/** /**
@ -100,7 +100,7 @@ public class IOTestProvider implements WALProvider {
providerId = DEFAULT_PROVIDER_ID; providerId = DEFAULT_PROVIDER_ID;
} }
final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
log = new IOTestWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), log = new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
@ -184,7 +184,12 @@ public class IOTestProvider implements WALProvider {
if (!initialized || doFileRolls) { if (!initialized || doFileRolls) {
LOG.info("creating new writer instance."); LOG.info("creating new writer instance.");
final ProtobufLogWriter writer = new IOTestWriter(); final ProtobufLogWriter writer = new IOTestWriter();
writer.init(fs, path, conf, false); try {
writer.init(fs, path, conf, false);
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
throw new IOException("Can't create writer instance because underlying FileSystem " +
"doesn't support needed stream capabilities.", exception);
}
if (!initialized) { if (!initialized) {
LOG.info("storing initial writer instance in case file rolling isn't allowed."); LOG.info("storing initial writer instance in case file rolling isn't allowed.");
noRollsWriter = writer; noRollsWriter = writer;
@ -207,7 +212,8 @@ public class IOTestProvider implements WALProvider {
private boolean doSyncs; private boolean doSyncs;
@Override @Override
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException { public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS); Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
doAppends = doSyncs = true; doAppends = doSyncs = true;