HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync.
* pull things that don't rely on HDFS in hbase-server/FSUtils into hbase-common/CommonFSUtils * refactor setStoragePolicy so that it can move into hbase-common/CommonFSUtils, as a side effect update it for Hadoop 2.8,3.0+ * refactor WALProcedureStore so that it handles its own FS interactions * add a reflection-based lookup of stream capabilities * call said lookup in places where we make WALs to make sure hflush/hsync is available. * javadoc / checkstyle cleanup on changes as flagged by yetus Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
0ff9dabe6c
commit
e79a007dd9
|
@ -0,0 +1,890 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Utility methods for interacting with the underlying file system.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class CommonFSUtils {
|
||||
private static final Log LOG = LogFactory.getLog(CommonFSUtils.class);
|
||||
|
||||
/** Parameter name for HBase WAL directory */
|
||||
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
|
||||
|
||||
/** Full access permissions (starting point for a umask) */
|
||||
public static final String FULL_RWX_PERMISSIONS = "777";
|
||||
|
||||
protected CommonFSUtils() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare of path component. Does not consider schema; i.e. if schemas
|
||||
* different but <code>path</code> starts with <code>rootPath</code>,
|
||||
* then the function returns true
|
||||
* @param rootPath value to check for
|
||||
* @param path subject to check
|
||||
* @return True if <code>path</code> starts with <code>rootPath</code>
|
||||
*/
|
||||
public static boolean isStartingWithPath(final Path rootPath, final String path) {
|
||||
String uriRootPath = rootPath.toUri().getPath();
|
||||
String tailUriPath = (new Path(path)).toUri().getPath();
|
||||
return tailUriPath.startsWith(uriRootPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
|
||||
* '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
|
||||
* the two will equate.
|
||||
* @param pathToSearch Path we will be trying to match against.
|
||||
* @param pathTail what to match
|
||||
* @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
|
||||
*/
|
||||
public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
|
||||
return isMatchingTail(pathToSearch, new Path(pathTail));
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
|
||||
* '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
|
||||
* schema; i.e. if schemas different but path or subpath matches, the two will equate.
|
||||
* @param pathToSearch Path we will be trying to match agains against
|
||||
* @param pathTail what to match
|
||||
* @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
|
||||
*/
|
||||
public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
|
||||
if (pathToSearch.depth() != pathTail.depth()) {
|
||||
return false;
|
||||
}
|
||||
Path tailPath = pathTail;
|
||||
String tailName;
|
||||
Path toSearch = pathToSearch;
|
||||
String toSearchName;
|
||||
boolean result = false;
|
||||
do {
|
||||
tailName = tailPath.getName();
|
||||
if (tailName == null || tailName.length() <= 0) {
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
toSearchName = toSearch.getName();
|
||||
if (toSearchName == null || toSearchName.length() <= 0) {
|
||||
break;
|
||||
}
|
||||
// Move up a parent on each path for next go around. Path doesn't let us go off the end.
|
||||
tailPath = tailPath.getParent();
|
||||
toSearch = toSearch.getParent();
|
||||
} while(tailName.equals(toSearchName));
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete if exists.
|
||||
* @param fs filesystem object
|
||||
* @param dir directory to delete
|
||||
* @return True if deleted <code>dir</code>
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static boolean deleteDirectory(final FileSystem fs, final Path dir)
|
||||
throws IOException {
|
||||
return fs.exists(dir) && fs.delete(dir, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of bytes that large input files should be optimally
|
||||
* be split into to minimize i/o time.
|
||||
*
|
||||
* use reflection to search for getDefaultBlockSize(Path f)
|
||||
* if the method doesn't exist, fall back to using getDefaultBlockSize()
|
||||
*
|
||||
* @param fs filesystem object
|
||||
* @return the default block size for the path's filesystem
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
|
||||
Method m = null;
|
||||
Class<? extends FileSystem> cls = fs.getClass();
|
||||
try {
|
||||
m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.info("FileSystem doesn't support getDefaultBlockSize");
|
||||
} catch (SecurityException e) {
|
||||
LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
|
||||
m = null; // could happen on setAccessible()
|
||||
}
|
||||
if (m == null) {
|
||||
return fs.getDefaultBlockSize(path);
|
||||
} else {
|
||||
try {
|
||||
Object ret = m.invoke(fs, path);
|
||||
return ((Long)ret).longValue();
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the default replication.
|
||||
*
|
||||
* use reflection to search for getDefaultReplication(Path f)
|
||||
* if the method doesn't exist, fall back to using getDefaultReplication()
|
||||
*
|
||||
* @param fs filesystem object
|
||||
* @param f path of file
|
||||
* @return default replication for the path's filesystem
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static short getDefaultReplication(final FileSystem fs, final Path path)
|
||||
throws IOException {
|
||||
Method m = null;
|
||||
Class<? extends FileSystem> cls = fs.getClass();
|
||||
try {
|
||||
m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.info("FileSystem doesn't support getDefaultReplication");
|
||||
} catch (SecurityException e) {
|
||||
LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
|
||||
m = null; // could happen on setAccessible()
|
||||
}
|
||||
if (m == null) {
|
||||
return fs.getDefaultReplication(path);
|
||||
} else {
|
||||
try {
|
||||
Object ret = m.invoke(fs, path);
|
||||
return ((Number)ret).shortValue();
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default buffer size to use during writes.
|
||||
*
|
||||
* The size of the buffer should probably be a multiple of hardware
|
||||
* page size (4096 on Intel x86), and it determines how much data is
|
||||
* buffered during read and write operations.
|
||||
*
|
||||
* @param fs filesystem object
|
||||
* @return default buffer size to use during writes
|
||||
*/
|
||||
public static int getDefaultBufferSize(final FileSystem fs) {
|
||||
return fs.getConf().getInt("io.file.buffer.size", 4096);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the specified file on the filesystem. By default, this will:
|
||||
* <ol>
|
||||
* <li>apply the umask in the configuration (if it is enabled)</li>
|
||||
* <li>use the fs configured buffer size (or 4096 if not set)</li>
|
||||
* <li>use the default replication</li>
|
||||
* <li>use the default block size</li>
|
||||
* <li>not track progress</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param fs {@link FileSystem} on which to write the file
|
||||
* @param path {@link Path} to the file to write
|
||||
* @param perm intial permissions
|
||||
* @param overwrite Whether or not the created file should be overwritten.
|
||||
* @return output stream to the created file
|
||||
* @throws IOException if the file cannot be created
|
||||
*/
|
||||
public static FSDataOutputStream create(FileSystem fs, Path path,
|
||||
FsPermission perm, boolean overwrite) throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
|
||||
}
|
||||
return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
|
||||
getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the file permissions specified in the configuration, if they are
|
||||
* enabled.
|
||||
*
|
||||
* @param fs filesystem that the file will be created on.
|
||||
* @param conf configuration to read for determining if permissions are
|
||||
* enabled and which to use
|
||||
* @param permssionConfKey property key in the configuration to use when
|
||||
* finding the permission
|
||||
* @return the permission to use when creating a new file on the fs. If
|
||||
* special permissions are not specified in the configuration, then
|
||||
* the default permissions on the the fs will be returned.
|
||||
*/
|
||||
public static FsPermission getFilePermissions(final FileSystem fs,
|
||||
final Configuration conf, final String permssionConfKey) {
|
||||
boolean enablePermissions = conf.getBoolean(
|
||||
HConstants.ENABLE_DATA_FILE_UMASK, false);
|
||||
|
||||
if (enablePermissions) {
|
||||
try {
|
||||
FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
|
||||
// make sure that we have a mask, if not, go default.
|
||||
String mask = conf.get(permssionConfKey);
|
||||
if (mask == null) {
|
||||
return FsPermission.getFileDefault();
|
||||
}
|
||||
// appy the umask
|
||||
FsPermission umask = new FsPermission(mask);
|
||||
return perm.applyUMask(umask);
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.warn(
|
||||
"Incorrect umask attempted to be created: "
|
||||
+ conf.get(permssionConfKey)
|
||||
+ ", using default file permissions.", e);
|
||||
return FsPermission.getFileDefault();
|
||||
}
|
||||
}
|
||||
return FsPermission.getFileDefault();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies root directory path is a valid URI with a scheme
|
||||
*
|
||||
* @param root root directory path
|
||||
* @return Passed <code>root</code> argument.
|
||||
* @throws IOException if not a valid URI with a scheme
|
||||
*/
|
||||
public static Path validateRootPath(Path root) throws IOException {
|
||||
try {
|
||||
URI rootURI = new URI(root.toString());
|
||||
String scheme = rootURI.getScheme();
|
||||
if (scheme == null) {
|
||||
throw new IOException("Root directory does not have a scheme");
|
||||
}
|
||||
return root;
|
||||
} catch (URISyntaxException e) {
|
||||
IOException io = new IOException("Root directory path is not a valid " +
|
||||
"URI -- check your " + HConstants.HBASE_DIR + " configuration");
|
||||
io.initCause(e);
|
||||
throw io;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks for the presence of the WAL log root path (using the provided conf object) in the given
|
||||
* path. If it exists, this method removes it and returns the String representation of remaining
|
||||
* relative path.
|
||||
* @param path must not be null
|
||||
* @param conf must not be null
|
||||
* @return String representation of the remaining relative path
|
||||
* @throws IOException from underlying filesystem
|
||||
*/
|
||||
public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
|
||||
Path root = getWALRootDir(conf);
|
||||
String pathStr = path.toString();
|
||||
// check that the path is absolute... it has the root path in it.
|
||||
if (!pathStr.startsWith(root.toString())) {
|
||||
return pathStr;
|
||||
}
|
||||
// if not, return as it is.
|
||||
return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the 'path' component of a Path. In Hadoop, Path is an URI. This
|
||||
* method returns the 'path' component of a Path's URI: e.g. If a Path is
|
||||
* <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
|
||||
* this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
|
||||
* This method is useful if you want to print out a Path without qualifying
|
||||
* Filesystem instance.
|
||||
* @param p Filesystem Path whose 'path' component we are to return.
|
||||
* @return Path portion of the Filesystem
|
||||
*/
|
||||
public static String getPath(Path p) {
|
||||
return p.toUri().getPath();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c configuration
|
||||
* @return {@link Path} to hbase root directory from
|
||||
* configuration as a qualified Path.
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static Path getRootDir(final Configuration c) throws IOException {
|
||||
Path p = new Path(c.get(HConstants.HBASE_DIR));
|
||||
FileSystem fs = p.getFileSystem(c);
|
||||
return p.makeQualified(fs);
|
||||
}
|
||||
|
||||
public static void setRootDir(final Configuration c, final Path root) throws IOException {
|
||||
c.set(HConstants.HBASE_DIR, root.toString());
|
||||
}
|
||||
|
||||
public static void setFsDefault(final Configuration c, final Path root) throws IOException {
|
||||
c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
|
||||
}
|
||||
|
||||
public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
|
||||
Path p = getRootDir(c);
|
||||
return p.getFileSystem(c);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c configuration
|
||||
* @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
|
||||
* configuration as a qualified Path. Defaults to HBase root dir.
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static Path getWALRootDir(final Configuration c) throws IOException {
|
||||
Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
|
||||
if (!isValidWALRootDir(p, c)) {
|
||||
return getRootDir(c);
|
||||
}
|
||||
FileSystem fs = p.getFileSystem(c);
|
||||
return p.makeQualified(fs);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
|
||||
c.set(HBASE_WAL_DIR, root.toString());
|
||||
}
|
||||
|
||||
public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
|
||||
Path p = getWALRootDir(c);
|
||||
return p.getFileSystem(c);
|
||||
}
|
||||
|
||||
private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
|
||||
Path rootDir = getRootDir(c);
|
||||
if (walDir != rootDir) {
|
||||
if (walDir.toString().startsWith(rootDir.toString() + "/")) {
|
||||
throw new IllegalStateException("Illegal WAL directory specified. " +
|
||||
"WAL directories are not permitted to be under the root directory if set.");
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
|
||||
* path rootdir
|
||||
*
|
||||
* @param rootdir qualified path of HBase root directory
|
||||
* @param tableName name of table
|
||||
* @return {@link org.apache.hadoop.fs.Path} for table
|
||||
*/
|
||||
public static Path getTableDir(Path rootdir, final TableName tableName) {
|
||||
return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
|
||||
tableName.getQualifierAsString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.hbase.TableName} object representing
|
||||
* the table directory under
|
||||
* path rootdir
|
||||
*
|
||||
* @param tablePath path of table
|
||||
* @return {@link org.apache.hadoop.fs.Path} for table
|
||||
*/
|
||||
public static TableName getTableName(Path tablePath) {
|
||||
return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.fs.Path} object representing
|
||||
* the namespace directory under path rootdir
|
||||
*
|
||||
* @param rootdir qualified path of HBase root directory
|
||||
* @param namespace namespace name
|
||||
* @return {@link org.apache.hadoop.fs.Path} for table
|
||||
*/
|
||||
public static Path getNamespaceDir(Path rootdir, final String namespace) {
|
||||
return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
|
||||
new Path(namespace)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets storage policy for given path according to config setting.
|
||||
* If the passed path is a directory, we'll set the storage policy for all files
|
||||
* created in the future in said directory. Note that this change in storage
|
||||
* policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
|
||||
* If we're running on a FileSystem implementation that doesn't support the given storage policy
|
||||
* (or storage policies at all), then we'll issue a log message and continue.
|
||||
*
|
||||
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
|
||||
*
|
||||
* @param fs We only do anything it implements a setStoragePolicy method
|
||||
* @param conf used to look up storage policy with given key; not modified.
|
||||
* @param path the Path whose storage policy is to be set
|
||||
* @param policyKey Key to use pulling a policy from Configuration:
|
||||
* e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy).
|
||||
* @param defaultPolicy if the configured policy is equal to this policy name, we will skip
|
||||
* telling the FileSystem to set a storage policy.
|
||||
*/
|
||||
public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
|
||||
final Path path, final String policyKey, final String defaultPolicy) {
|
||||
String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
|
||||
if (storagePolicy.equals(defaultPolicy)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
setStoragePolicy(fs, path, storagePolicy);
|
||||
}
|
||||
|
||||
// this mapping means that under a federated FileSystem implementation, we'll
|
||||
// only log the first failure from any of the underlying FileSystems at WARN and all others
|
||||
// will be at DEBUG.
|
||||
private static final Map<FileSystem, Boolean> warningMap =
|
||||
new ConcurrentHashMap<FileSystem, Boolean>();
|
||||
|
||||
/**
|
||||
* Sets storage policy for given path.
|
||||
* If the passed path is a directory, we'll set the storage policy for all files
|
||||
* created in the future in said directory. Note that this change in storage
|
||||
* policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
|
||||
* If we're running on a version of FileSystem that doesn't support the given storage policy
|
||||
* (or storage policies at all), then we'll issue a log message and continue.
|
||||
*
|
||||
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
|
||||
*
|
||||
* @param fs We only do anything it implements a setStoragePolicy method
|
||||
* @param path the Path whose storage policy is to be set
|
||||
* @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
|
||||
* org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
|
||||
* 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
|
||||
*/
|
||||
public static void setStoragePolicy(final FileSystem fs, final Path path,
|
||||
final String storagePolicy) {
|
||||
if (storagePolicy == null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed a null storagePolicy, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
final String trimmedStoragePolicy = storagePolicy.trim();
|
||||
if (trimmedStoragePolicy.isEmpty()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed an empty storagePolicy, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
|
||||
}
|
||||
|
||||
/*
|
||||
* All args have been checked and are good. Run the setStoragePolicy invocation.
|
||||
*/
|
||||
private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
|
||||
final String storagePolicy) {
|
||||
Method m = null;
|
||||
try {
|
||||
m = fs.getClass().getDeclaredMethod("setStoragePolicy",
|
||||
new Class<?>[] { Path.class, String.class });
|
||||
m.setAccessible(true);
|
||||
} catch (NoSuchMethodException e) {
|
||||
final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " +
|
||||
"not available. This is normal and expected on earlier Hadoop versions.";
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn(msg, e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(msg, e);
|
||||
}
|
||||
m = null;
|
||||
} catch (SecurityException e) {
|
||||
final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " +
|
||||
"HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " +
|
||||
"to the user@hbase mailing list. Please be sure to include a link to your configs, and " +
|
||||
"logs that include this message and period of time before it. Logs around service " +
|
||||
"start up will probably be useful as well.";
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn(msg, e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(msg, e);
|
||||
}
|
||||
m = null; // could happen on setAccessible() or getDeclaredMethod()
|
||||
}
|
||||
if (m != null) {
|
||||
try {
|
||||
m.invoke(fs, path, storagePolicy);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
|
||||
// misuse than a runtime problem with HDFS.
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
|
||||
"DEBUG log level might have more details.", e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
|
||||
}
|
||||
// check for lack of HDFS-7228
|
||||
if (e instanceof InvocationTargetException) {
|
||||
final Throwable exception = e.getCause();
|
||||
if (exception instanceof RemoteException &&
|
||||
HadoopIllegalArgumentException.class.getName().equals(
|
||||
((RemoteException)exception).getClassName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
|
||||
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
|
||||
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
|
||||
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
|
||||
}
|
||||
// Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
|
||||
// that throws UnsupportedOperationException
|
||||
} else if (exception instanceof UnsupportedOperationException) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The underlying FileSystem implementation doesn't support " +
|
||||
"setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
|
||||
"appears to be present in your version of Hadoop. For more information check " +
|
||||
"the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
|
||||
"specification docs from HADOOP-11981, and/or related documentation from the " +
|
||||
"provider of the underlying FileSystem (its name should appear in the " +
|
||||
"stacktrace that accompanies this message). Note in particular that Hadoop's " +
|
||||
"local filesystem implementation doesn't support storage policies.", exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf must not be null
|
||||
* @return True if this filesystem whose scheme is 'hdfs'.
|
||||
* @throws IOException from underlying FileSystem
|
||||
*/
|
||||
public static boolean isHDFS(final Configuration conf) throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
String scheme = fs.getUri().getScheme();
|
||||
return scheme.equalsIgnoreCase("hdfs");
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given path is the one with 'recovered.edits' dir.
|
||||
* @param path must not be null
|
||||
* @return True if we recovered edits
|
||||
*/
|
||||
public static boolean isRecoveredEdits(Path path) {
|
||||
return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf must not be null
|
||||
* @return Returns the filesystem of the hbase rootdir.
|
||||
* @throws IOException from underlying FileSystem
|
||||
*/
|
||||
public static FileSystem getCurrentFileSystem(Configuration conf)
|
||||
throws IOException {
|
||||
return getRootDir(conf).getFileSystem(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
|
||||
* This accommodates differences between hadoop versions, where hadoop 1
|
||||
* does not throw a FileNotFoundException, and return an empty FileStatus[]
|
||||
* while Hadoop 2 will throw FileNotFoundException.
|
||||
*
|
||||
* Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
|
||||
* Path, FileStatusFilter) instead.
|
||||
*
|
||||
* @param fs file system
|
||||
* @param dir directory
|
||||
* @param filter path filter
|
||||
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
|
||||
*/
|
||||
public static FileStatus [] listStatus(final FileSystem fs,
|
||||
final Path dir, final PathFilter filter) throws IOException {
|
||||
FileStatus [] status = null;
|
||||
try {
|
||||
status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// if directory doesn't exist, return null
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(dir + " doesn't exist");
|
||||
}
|
||||
}
|
||||
if (status == null || status.length < 1) {
|
||||
return null;
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
|
||||
* This would accommodates differences between hadoop versions
|
||||
*
|
||||
* @param fs file system
|
||||
* @param dir directory
|
||||
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
|
||||
*/
|
||||
public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
|
||||
return listStatus(fs, dir, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
|
||||
*
|
||||
* @param fs file system
|
||||
* @param dir directory
|
||||
* @return LocatedFileStatus list
|
||||
*/
|
||||
public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
|
||||
final Path dir) throws IOException {
|
||||
List<LocatedFileStatus> status = null;
|
||||
try {
|
||||
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
|
||||
.listFiles(dir, false);
|
||||
while (locatedFileStatusRemoteIterator.hasNext()) {
|
||||
if (status == null) {
|
||||
status = Lists.newArrayList();
|
||||
}
|
||||
status.add(locatedFileStatusRemoteIterator.next());
|
||||
}
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// if directory doesn't exist, return null
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(dir + " doesn't exist");
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.delete() and returns the value returned by the fs.delete()
|
||||
*
|
||||
* @param fs must not be null
|
||||
* @param path must not be null
|
||||
* @param recursive delete tree rooted at path
|
||||
* @return the value returned by the fs.delete()
|
||||
* @throws IOException from underlying FileSystem
|
||||
*/
|
||||
public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
|
||||
throws IOException {
|
||||
return fs.delete(path, recursive);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.exists(). Checks if the specified path exists
|
||||
*
|
||||
* @param fs must not be null
|
||||
* @param path must not be null
|
||||
* @return the value returned by fs.exists()
|
||||
* @throws IOException from underlying FileSystem
|
||||
*/
|
||||
public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
|
||||
return fs.exists(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Log the current state of the filesystem from a certain root directory
|
||||
* @param fs filesystem to investigate
|
||||
* @param root root file/directory to start logging from
|
||||
* @param LOG log to output information
|
||||
* @throws IOException if an unexpected exception occurs
|
||||
*/
|
||||
public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
|
||||
throws IOException {
|
||||
LOG.debug("Current file system:");
|
||||
logFSTree(LOG, fs, root, "|-");
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursive helper to log the state of the FS
|
||||
*
|
||||
* @see #logFileSystemState(FileSystem, Path, Log)
|
||||
*/
|
||||
private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
|
||||
throws IOException {
|
||||
FileStatus[] files = listStatus(fs, root, null);
|
||||
if (files == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (FileStatus file : files) {
|
||||
if (file.isDirectory()) {
|
||||
LOG.debug(prefix + file.getPath().getName() + "/");
|
||||
logFSTree(LOG, fs, file.getPath(), prefix + "---");
|
||||
} else {
|
||||
LOG.debug(prefix + file.getPath().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
|
||||
throws IOException {
|
||||
// set the modify time for TimeToLive Cleaner
|
||||
fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
|
||||
return fs.rename(src, dest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Do our short circuit read setup.
|
||||
* Checks buffer size to use and whether to do checksumming in hbase or hdfs.
|
||||
* @param conf must not be null
|
||||
*/
|
||||
public static void setupShortCircuitRead(final Configuration conf) {
|
||||
// Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
|
||||
boolean shortCircuitSkipChecksum =
|
||||
conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
|
||||
boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
|
||||
if (shortCircuitSkipChecksum) {
|
||||
LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
|
||||
"be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
|
||||
"it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
|
||||
assert !shortCircuitSkipChecksum; //this will fail if assertions are on
|
||||
}
|
||||
checkShortCircuitReadBufferSize(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if short circuit read buffer size is set and if not, set it to hbase value.
|
||||
* @param conf must not be null
|
||||
*/
|
||||
public static void checkShortCircuitReadBufferSize(final Configuration conf) {
|
||||
final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
|
||||
final int notSet = -1;
|
||||
// DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
|
||||
final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
|
||||
int size = conf.getInt(dfsKey, notSet);
|
||||
// If a size is set, return -- we will use it.
|
||||
if (size != notSet) {
|
||||
return;
|
||||
}
|
||||
// But short circuit buffer size is normally not set. Put in place the hbase wanted size.
|
||||
int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
|
||||
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
|
||||
}
|
||||
|
||||
// Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
|
||||
// not until we attempt to reference it.
|
||||
private static class StreamCapabilities {
|
||||
public static final boolean PRESENT;
|
||||
public static final Class<?> CLASS;
|
||||
public static final Method METHOD;
|
||||
static {
|
||||
boolean tmp = false;
|
||||
Class<?> clazz = null;
|
||||
Method method = null;
|
||||
try {
|
||||
clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
|
||||
method = clazz.getMethod("hasCapability", String.class);
|
||||
tmp = true;
|
||||
} catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
|
||||
LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
|
||||
"HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
|
||||
"support hflush/hsync. If you are running on top of HDFS this probably just " +
|
||||
"means you have an older version and this can be ignored. If you are running on " +
|
||||
"top of an alternate FileSystem implementation you should manually verify that " +
|
||||
"hflush and hsync are implemented; otherwise you risk data loss and hard to " +
|
||||
"diagnose errors when our assumptions are violated.");
|
||||
LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
|
||||
exception);
|
||||
} finally {
|
||||
PRESENT = tmp;
|
||||
CLASS = clazz;
|
||||
METHOD = method;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If our FileSystem version includes the StreamCapabilities class, check if
|
||||
* the given stream has a particular capability.
|
||||
* @param stream capabilities are per-stream instance, so check this one specifically. must not be
|
||||
* null
|
||||
* @param capability what to look for, per Hadoop Common's FileSystem docs
|
||||
* @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
|
||||
* implement it. return result of asking the stream otherwise.
|
||||
*/
|
||||
public static boolean hasCapability(FSDataOutputStream stream, String capability) {
|
||||
// be consistent whether or not StreamCapabilities is present
|
||||
if (stream == null) {
|
||||
throw new NullPointerException("stream parameter must not be null.");
|
||||
}
|
||||
// If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
|
||||
// otherwise old versions of Hadoop will break.
|
||||
boolean result = true;
|
||||
if (StreamCapabilities.PRESENT) {
|
||||
// if StreamCapabilities is present, but the stream doesn't implement it
|
||||
// or we run into a problem invoking the method,
|
||||
// we treat that as equivalent to not declaring anything
|
||||
result = false;
|
||||
if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
|
||||
try {
|
||||
result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
|
||||
} catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
|
||||
exception) {
|
||||
LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
|
||||
"our understanding of how it's supposed to work. Please file a JIRA and include " +
|
||||
"the following stack trace. In the mean time we're interpreting this behavior " +
|
||||
"difference as a lack of capability support, which will probably cause a failure.",
|
||||
exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper exception for those cases where the place where we need to check a stream capability
|
||||
* is not where we have the needed context to explain the impact and mitigation for a lack.
|
||||
*/
|
||||
public static class StreamLacksCapabilityException extends Exception {
|
||||
public StreamLacksCapabilityException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
public StreamLacksCapabilityException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test {@link CommonFSUtils}.
|
||||
*/
|
||||
@Category({MiscTests.class, MediumTests.class})
|
||||
public class TestCommonFSUtils {
|
||||
private static final Log LOG = LogFactory.getLog(TestCommonFSUtils.class);
|
||||
|
||||
private HBaseCommonTestingUtility htu;
|
||||
private Configuration conf;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
htu = new HBaseCommonTestingUtility();
|
||||
conf = htu.getConfiguration();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test path compare and prefix checking.
|
||||
*/
|
||||
@Test
|
||||
public void testMatchingTail() throws IOException {
|
||||
Path rootdir = htu.getDataTestDir();
|
||||
final FileSystem fs = rootdir.getFileSystem(conf);
|
||||
assertTrue(rootdir.depth() > 1);
|
||||
Path partPath = new Path("a", "b");
|
||||
Path fullPath = new Path(rootdir, partPath);
|
||||
Path fullyQualifiedPath = fs.makeQualified(fullPath);
|
||||
assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath));
|
||||
assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath.toString()));
|
||||
assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullPath.toString()));
|
||||
assertTrue(CommonFSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString()));
|
||||
assertFalse(CommonFSUtils.isStartingWithPath(rootdir, partPath.toString()));
|
||||
assertFalse(CommonFSUtils.isMatchingTail(fullyQualifiedPath, partPath));
|
||||
assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath));
|
||||
assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString()));
|
||||
assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath)));
|
||||
assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString()));
|
||||
assertFalse(CommonFSUtils.isMatchingTail(fullPath, new Path("x")));
|
||||
assertFalse(CommonFSUtils.isMatchingTail(new Path("x"), fullPath));
|
||||
}
|
||||
|
||||
private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
|
||||
throws Exception {
|
||||
FSDataOutputStream out = fs.create(file);
|
||||
byte [] data = new byte[dataSize];
|
||||
out.write(data, 0, dataSize);
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetWALRootDir() throws Exception {
|
||||
Path p = new Path("file:///hbase/root");
|
||||
CommonFSUtils.setWALRootDir(conf, p);
|
||||
assertEquals(p.toString(), conf.get(CommonFSUtils.HBASE_WAL_DIR));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWALRootDir() throws IOException {
|
||||
Path root = new Path("file:///hbase/root");
|
||||
Path walRoot = new Path("file:///hbase/logroot");
|
||||
CommonFSUtils.setRootDir(conf, root);
|
||||
assertEquals(CommonFSUtils.getRootDir(conf), root);
|
||||
assertEquals(CommonFSUtils.getWALRootDir(conf), root);
|
||||
CommonFSUtils.setWALRootDir(conf, walRoot);
|
||||
assertEquals(CommonFSUtils.getWALRootDir(conf), walRoot);
|
||||
}
|
||||
|
||||
@Test(expected=IllegalStateException.class)
|
||||
public void testGetWALRootDirIllegalWALDir() throws IOException {
|
||||
Path root = new Path("file:///hbase/root");
|
||||
Path invalidWALDir = new Path("file:///hbase/root/logroot");
|
||||
CommonFSUtils.setRootDir(conf, root);
|
||||
CommonFSUtils.setWALRootDir(conf, invalidWALDir);
|
||||
CommonFSUtils.getWALRootDir(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveWALRootPath() throws Exception {
|
||||
CommonFSUtils.setRootDir(conf, new Path("file:///user/hbase"));
|
||||
Path testFile = new Path(CommonFSUtils.getRootDir(conf), "test/testfile");
|
||||
Path tmpFile = new Path("file:///test/testfile");
|
||||
assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), "test/testfile");
|
||||
assertEquals(CommonFSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
|
||||
CommonFSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
|
||||
assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), testFile.toString());
|
||||
Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog");
|
||||
assertEquals(CommonFSUtils.removeWALRootPath(logFile, conf), "test/testlog");
|
||||
}
|
||||
|
||||
@Test(expected=NullPointerException.class)
|
||||
public void streamCapabilitiesDoesNotAllowNullStream() {
|
||||
CommonFSUtils.hasCapability(null, "hopefully any string");
|
||||
}
|
||||
|
||||
private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
|
||||
static {
|
||||
boolean tmp = false;
|
||||
try {
|
||||
Class.forName("org.apache.hadoop.fs.StreamCapabilities");
|
||||
tmp = true;
|
||||
LOG.debug("Test thought StreamCapabilities class was present.");
|
||||
} catch (ClassNotFoundException exception) {
|
||||
LOG.debug("Test didn't think StreamCapabilities class was present.");
|
||||
} finally {
|
||||
STREAM_CAPABILITIES_IS_PRESENT = tmp;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException {
|
||||
FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null);
|
||||
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
|
||||
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
|
||||
CommonFSUtils.hasCapability(stream, "hsync"));
|
||||
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
|
||||
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
|
||||
CommonFSUtils.hasCapability(stream, "hflush"));
|
||||
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
|
||||
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
|
||||
CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " +
|
||||
"implement."));
|
||||
}
|
||||
}
|
|
@ -46,18 +46,20 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
|
||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* WAL implementation of the ProcedureStore.
|
||||
|
@ -67,6 +69,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
|
|||
public class WALProcedureStore extends ProcedureStoreBase {
|
||||
private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
|
||||
public static final String LOG_PREFIX = "pv2-";
|
||||
/** Used to construct the name of the log directory for master procedures */
|
||||
public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
|
||||
|
||||
|
||||
public interface LeaseRecovery {
|
||||
void recoverFileLease(FileSystem fs, Path path) throws IOException;
|
||||
|
@ -185,18 +190,42 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
}
|
||||
}
|
||||
|
||||
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
|
||||
final LeaseRecovery leaseRecovery) {
|
||||
this(conf, fs, walDir, null, leaseRecovery);
|
||||
public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
|
||||
throws IOException {
|
||||
this(conf,
|
||||
new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
|
||||
new Path(CommonFSUtils.getRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), leaseRecovery);
|
||||
}
|
||||
|
||||
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
|
||||
final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
|
||||
this.fs = fs;
|
||||
@VisibleForTesting
|
||||
public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
|
||||
final LeaseRecovery leaseRecovery) throws IOException {
|
||||
this.conf = conf;
|
||||
this.leaseRecovery = leaseRecovery;
|
||||
this.walDir = walDir;
|
||||
this.walArchiveDir = walArchiveDir;
|
||||
this.leaseRecovery = leaseRecovery;
|
||||
this.fs = walDir.getFileSystem(conf);
|
||||
|
||||
// Create the log directory for the procedure store
|
||||
if (!fs.exists(walDir)) {
|
||||
if (!fs.mkdirs(walDir)) {
|
||||
throw new IOException("Unable to mkdir " + walDir);
|
||||
}
|
||||
}
|
||||
// Now that it exists, set the log policy
|
||||
CommonFSUtils.setStoragePolicy(fs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
|
||||
HConstants.DEFAULT_WAL_STORAGE_POLICY);
|
||||
|
||||
// Create archive dir up front. Rename won't work w/o it up on HDFS.
|
||||
if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
|
||||
if (this.fs.mkdirs(this.walArchiveDir)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Failed create of " + this.walArchiveDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -247,16 +276,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
}
|
||||
};
|
||||
syncThread.start();
|
||||
|
||||
// Create archive dir up front. Rename won't work w/o it up on HDFS.
|
||||
if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
|
||||
if (this.fs.mkdirs(this.walArchiveDir)) {
|
||||
if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " +
|
||||
this.walArchiveDir);
|
||||
} else {
|
||||
LOG.warn("Failed create of " + this.walArchiveDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1005,6 +1024,17 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
LOG.warn("failed to create log file with id=" + logId, re);
|
||||
return false;
|
||||
}
|
||||
// After we create the stream but before we attempt to use it at all
|
||||
// ensure that we can provide the level of data safety we're configured
|
||||
// to provide.
|
||||
final String durability = useHsync ? "hsync" : "hflush";
|
||||
if (!(CommonFSUtils.hasCapability(newStream, durability))) {
|
||||
throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
|
||||
" for proper operation during component failures, but the underlying filesystem does " +
|
||||
"not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
|
||||
"' to set the desired level of robustness and ensure the config value of '" +
|
||||
CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
|
||||
}
|
||||
try {
|
||||
ProcedureWALFormat.writeHeader(newStream, header);
|
||||
startPos = newStream.getPos();
|
||||
|
|
|
@ -51,14 +51,14 @@ public class ProcedureTestingUtility {
|
|||
private ProcedureTestingUtility() {
|
||||
}
|
||||
|
||||
public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
|
||||
final Path baseDir) throws IOException {
|
||||
return createWalStore(conf, fs, baseDir);
|
||||
public static ProcedureStore createStore(final Configuration conf, final Path dir)
|
||||
throws IOException {
|
||||
return createWalStore(conf, dir);
|
||||
}
|
||||
|
||||
public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
|
||||
final Path walDir) throws IOException {
|
||||
return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() {
|
||||
public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
|
||||
throws IOException {
|
||||
return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
|
||||
@Override
|
||||
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
|
||||
// no-op
|
||||
|
|
|
@ -60,7 +60,7 @@ public class TestChildProcedures {
|
|||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procEnv = new TestProcEnv();
|
||||
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
|
||||
procExecutor.testing = new ProcedureExecutor.Testing();
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
|
|
|
@ -61,7 +61,7 @@ public class TestProcedureEvents {
|
|||
logDir = new Path(testDir, "proc-logs");
|
||||
|
||||
procEnv = new TestProcEnv();
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
|
||||
procStore.start(1);
|
||||
procExecutor.start(1, true);
|
||||
|
|
|
@ -63,7 +63,7 @@ public class TestProcedureExecution {
|
|||
assertTrue(testDir.depth() > 1);
|
||||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
|
||||
|
|
|
@ -66,7 +66,7 @@ public class TestProcedureMetrics {
|
|||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procEnv = new TestProcEnv();
|
||||
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore);
|
||||
procExecutor.testing = new ProcedureExecutor.Testing();
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
|
|
|
@ -67,7 +67,7 @@ public class TestProcedureNonce {
|
|||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procEnv = new TestProcEnv();
|
||||
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
|
||||
procExecutor.testing = new ProcedureExecutor.Testing();
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TestProcedureRecovery {
|
|||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procEnv = new TestProcEnv();
|
||||
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
|
||||
procExecutor.testing = new ProcedureExecutor.Testing();
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
|
|
|
@ -68,7 +68,7 @@ public class TestProcedureReplayOrder {
|
|||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procEnv = new TestProcedureEnv();
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
|
||||
procStore.start(NUM_THREADS);
|
||||
procExecutor.start(1, true);
|
||||
|
|
|
@ -76,7 +76,7 @@ public class TestStateMachineProcedure {
|
|||
fs = testDir.getFileSystem(htu.getConfiguration());
|
||||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
|
||||
|
|
|
@ -65,7 +65,7 @@ public class TestYieldProcedures {
|
|||
assertTrue(testDir.depth() > 1);
|
||||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||
procRunnables = new TestScheduler();
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
|
||||
procStore, procRunnables);
|
||||
|
|
|
@ -126,7 +126,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
|
|||
Path logDir = new Path(testDir, "proc-logs");
|
||||
System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
|
||||
fs.delete(logDir, true);
|
||||
store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
|
||||
store = ProcedureTestingUtility.createWalStore(conf, logDir);
|
||||
store.start(1);
|
||||
store.recoverLease();
|
||||
store.load(new LoadCounter());
|
||||
|
|
|
@ -93,9 +93,9 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
|
|||
System.out.println("Logs directory : " + logDir.toString());
|
||||
fs.delete(logDir, true);
|
||||
if ("nosync".equals(syncType)) {
|
||||
store = new NoSyncWalProcedureStore(conf, fs, logDir);
|
||||
store = new NoSyncWalProcedureStore(conf, logDir);
|
||||
} else {
|
||||
store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
|
||||
store = ProcedureTestingUtility.createWalStore(conf, logDir);
|
||||
}
|
||||
store.start(numThreads);
|
||||
store.recoverLease();
|
||||
|
@ -244,9 +244,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
|
|||
}
|
||||
|
||||
private class NoSyncWalProcedureStore extends WALProcedureStore {
|
||||
public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs,
|
||||
final Path logDir) {
|
||||
super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
|
||||
public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
|
||||
super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
|
||||
@Override
|
||||
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
|
||||
// no-op
|
||||
|
|
|
@ -75,7 +75,7 @@ public class TestStressWALProcedureStore {
|
|||
assertTrue(testDir.depth() > 1);
|
||||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||
procStore.start(PROCEDURE_STORE_SLOTS);
|
||||
procStore.recoverLease();
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ public class TestWALProcedureStore {
|
|||
|
||||
setupConfig(htu.getConfiguration());
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
|
||||
procStore.start(PROCEDURE_STORE_SLOTS);
|
||||
procStore.recoverLease();
|
||||
procStore.load(new LoadCounter());
|
||||
|
@ -729,7 +729,7 @@ public class TestWALProcedureStore {
|
|||
assertEquals(procs.length + 1, status.length);
|
||||
|
||||
// simulate another active master removing the wals
|
||||
procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir,
|
||||
procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
|
||||
new WALProcedureStore.LeaseRecovery() {
|
||||
private int count = 0;
|
||||
|
||||
|
|
|
@ -65,9 +65,6 @@ import edu.umd.cs.findbugs.annotations.Nullable;
|
|||
public class HFileSystem extends FilterFileSystem {
|
||||
public static final Log LOG = LogFactory.getLog(HFileSystem.class);
|
||||
|
||||
/** Parameter name for HBase WAL directory */
|
||||
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
|
||||
|
||||
private final FileSystem noChecksumFs; // read hfile data from storage
|
||||
private final boolean useHBaseChecksum;
|
||||
private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
|
||||
|
@ -56,7 +57,8 @@ public final class AsyncFSOutputHelper {
|
|||
*/
|
||||
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
|
||||
boolean createParent, short replication, long blockSize, EventLoop eventLoop,
|
||||
Class<? extends Channel> channelClass) throws IOException {
|
||||
Class<? extends Channel> channelClass)
|
||||
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
|
||||
overwrite, createParent, replication, blockSize, eventLoop, channelClass);
|
||||
|
@ -69,6 +71,13 @@ public final class AsyncFSOutputHelper {
|
|||
} else {
|
||||
fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
|
||||
}
|
||||
// After we create the stream but before we attempt to use it at all
|
||||
// ensure that we can provide the level of data safety we're configured
|
||||
// to provide.
|
||||
if (!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
|
||||
CommonFSUtils.hasCapability(fsOut, "hsync"))) {
|
||||
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
|
||||
}
|
||||
final ExecutorService flushExecutor =
|
||||
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
|
||||
.setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
|
||||
|
|
|
@ -53,7 +53,6 @@ import java.util.regex.Pattern;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.ClusterStatus.Option;
|
||||
|
@ -1201,23 +1200,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
private void startProcedureExecutor() throws IOException {
|
||||
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
|
||||
final Path rootDir = FSUtils.getRootDir(conf);
|
||||
final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
|
||||
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
|
||||
final Path walArchiveDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
|
||||
final FileSystem walFs = walDir.getFileSystem(conf);
|
||||
|
||||
// Create the log directory for the procedure store
|
||||
if (!walFs.exists(walDir)) {
|
||||
if (!walFs.mkdirs(walDir)) {
|
||||
throw new IOException("Unable to mkdir " + walDir);
|
||||
}
|
||||
}
|
||||
// Now that it exists, set the log policy
|
||||
FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
|
||||
HConstants.DEFAULT_WAL_STORAGE_POLICY);
|
||||
|
||||
procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, walArchiveDir,
|
||||
procedureStore = new WALProcedureStore(conf,
|
||||
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
|
||||
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
|
||||
MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
|
||||
|
|
|
@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
|
@ -147,7 +147,7 @@ public class MasterFileSystem {
|
|||
HConstants.HREGION_LOGDIR_NAME,
|
||||
HConstants.HREGION_OLDLOGDIR_NAME,
|
||||
HConstants.CORRUPT_DIR_NAME,
|
||||
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR
|
||||
WALProcedureStore.MASTER_PROCEDURE_LOGDIR
|
||||
};
|
||||
// check if the root directory exists
|
||||
checkRootDir(this.rootdir, conf, this.fs);
|
||||
|
|
|
@ -24,9 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
public final class MasterProcedureConstants {
|
||||
private MasterProcedureConstants() {}
|
||||
|
||||
/** Used to construct the name of the log directory for master procedures */
|
||||
public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
|
||||
|
||||
/** Number of threads used by the procedure executor */
|
||||
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
|
||||
public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16;
|
||||
|
|
|
@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
|||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.DrainBarrier;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
|
@ -356,7 +356,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
// Now that it exists, set the storage policy for the entire directory of wal files related to
|
||||
// this FSHLog instance
|
||||
FSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY,
|
||||
CommonFSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY,
|
||||
HConstants.DEFAULT_WAL_STORAGE_POLICY);
|
||||
this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
|
||||
this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
|
||||
|
@ -381,7 +381,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
};
|
||||
|
||||
if (failIfWALExists) {
|
||||
final FileStatus[] walFiles = FSUtils.listStatus(fs, walDir, ourFiles);
|
||||
final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
|
||||
if (null != walFiles && 0 != walFiles.length) {
|
||||
throw new IOException("Target WAL already exists within directory " + walDir);
|
||||
}
|
||||
|
@ -398,7 +398,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
// Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
|
||||
// (it costs a little x'ing bocks)
|
||||
final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||
FSUtils.getDefaultBlockSize(this.fs, this.walDir));
|
||||
CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir));
|
||||
this.logrollsize = (long) (blocksize
|
||||
* conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
|
||||
|
||||
|
@ -652,7 +652,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
}
|
||||
LOG.info("Archiving " + p + " to " + newPath);
|
||||
if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
|
||||
if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
|
||||
throw new IOException("Unable to rename " + p + " to " + newPath);
|
||||
}
|
||||
// Tell our listeners that a log has been archived.
|
||||
|
@ -685,12 +685,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
try {
|
||||
long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
|
||||
int oldNumEntries = this.numEntries.getAndSet(0);
|
||||
final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
|
||||
final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
|
||||
if (oldPath != null) {
|
||||
this.walFile2Props.put(oldPath,
|
||||
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
|
||||
this.totalLogSize.addAndGet(oldFileLen);
|
||||
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
|
||||
LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
|
||||
+ ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
|
||||
} else {
|
||||
LOG.info("New WAL " + newPathString);
|
||||
|
@ -767,6 +767,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
cleanOldLogs();
|
||||
regionsToFlush = findRegionsToForceFlush();
|
||||
}
|
||||
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
|
||||
// If the underlying FileSystem can't do what we ask, treat as IO failure so
|
||||
// we'll abort.
|
||||
throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " +
|
||||
"for details.", exception);
|
||||
} finally {
|
||||
closeBarrier.endOp();
|
||||
assert scope == NullScope.INSTANCE || !scope.isDetached();
|
||||
|
@ -794,7 +799,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
* @return may be null if there are no files.
|
||||
*/
|
||||
protected FileStatus[] getFiles() throws IOException {
|
||||
return FSUtils.listStatus(fs, walDir, ourFiles);
|
||||
return CommonFSUtils.listStatus(fs, walDir, ourFiles);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -833,7 +838,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
}
|
||||
|
||||
if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
|
||||
if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
|
||||
throw new IOException("Unable to rename " + file.getPath() + " to " + p);
|
||||
}
|
||||
// Tell our listeners that a log was archived.
|
||||
|
@ -843,7 +848,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
}
|
||||
}
|
||||
LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.walArchiveDir));
|
||||
LOG.debug("Moved " + files.length + " WAL file(s) to " +
|
||||
CommonFSUtils.getPath(this.walArchiveDir));
|
||||
}
|
||||
LOG.info("Closed WAL: " + toString());
|
||||
}
|
||||
|
@ -1022,7 +1028,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
|
||||
|
||||
protected abstract W createWriterInstance(Path path) throws IOException;
|
||||
protected abstract W createWriterInstance(Path path) throws IOException,
|
||||
CommonFSUtils.StreamLacksCapabilityException;
|
||||
|
||||
/**
|
||||
* @return old wal file size
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.security.User;
|
|||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hbase.util.EncryptionTest;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
|
@ -153,7 +154,7 @@ public abstract class AbstractProtobufLogWriter {
|
|||
}
|
||||
|
||||
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
|
||||
throws IOException {
|
||||
throws IOException, StreamLacksCapabilityException {
|
||||
this.conf = conf;
|
||||
boolean doCompress = initializeCompressionContext(conf, path);
|
||||
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
|
||||
|
@ -237,7 +238,7 @@ public abstract class AbstractProtobufLogWriter {
|
|||
}
|
||||
|
||||
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||
short replication, long blockSize) throws IOException;
|
||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
|
||||
|
||||
/**
|
||||
* return the file length after written.
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
|||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
|
@ -153,7 +154,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
|
||||
@Override
|
||||
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||
short replication, long blockSize) throws IOException {
|
||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
|
||||
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
|
||||
blockSize, eventLoop, channelClass);
|
||||
this.asyncOutputWrapper = new OutputStreamWrapper(output);
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
|
@ -86,9 +88,13 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||
short replication, long blockSize) throws IOException {
|
||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
|
||||
this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize,
|
||||
null);
|
||||
// TODO Be sure to add a check for hsync if this branch includes HBASE-19024
|
||||
if (!(CommonFSUtils.hasCapability(output, "hflush"))) {
|
||||
throw new StreamLacksCapabilityException("hflush");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.util;
|
|||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.primitives.Ints;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.CheckForNull;
|
||||
|
@ -36,8 +35,6 @@ import java.io.InterruptedIOException;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -60,17 +57,14 @@ import java.util.regex.Pattern;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.ClusterId;
|
||||
|
@ -101,174 +95,24 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
|
||||
|
||||
/**
|
||||
* Utility methods for interacting with the underlying file system.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class FSUtils {
|
||||
public abstract class FSUtils extends CommonFSUtils {
|
||||
private static final Log LOG = LogFactory.getLog(FSUtils.class);
|
||||
|
||||
/** Full access permissions (starting point for a umask) */
|
||||
public static final String FULL_RWX_PERMISSIONS = "777";
|
||||
private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
|
||||
private static final int DEFAULT_THREAD_POOLSIZE = 2;
|
||||
|
||||
/** Set to true on Windows platforms */
|
||||
@VisibleForTesting // currently only used in testing. TODO refactor into a test class
|
||||
public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
|
||||
|
||||
protected FSUtils() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets storage policy for given path according to config setting.
|
||||
* If the passed path is a directory, we'll set the storage policy for all files
|
||||
* created in the future in said directory. Note that this change in storage
|
||||
* policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
|
||||
* If we're running on a version of HDFS that doesn't support the given storage policy
|
||||
* (or storage policies at all), then we'll issue a log message and continue.
|
||||
*
|
||||
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
|
||||
*
|
||||
* @param fs We only do anything if an instance of DistributedFileSystem
|
||||
* @param conf used to look up storage policy with given key; not modified.
|
||||
* @param path the Path whose storage policy is to be set
|
||||
* @param policyKey Key to use pulling a policy from Configuration:
|
||||
* e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy).
|
||||
* @param defaultPolicy usually should be the policy NONE to delegate to HDFS
|
||||
*/
|
||||
public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
|
||||
final Path path, final String policyKey, final String defaultPolicy) {
|
||||
String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
|
||||
if (storagePolicy.equals(defaultPolicy)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
setStoragePolicy(fs, path, storagePolicy);
|
||||
}
|
||||
|
||||
private static final Map<FileSystem, Boolean> warningMap =
|
||||
new ConcurrentHashMap<FileSystem, Boolean>();
|
||||
|
||||
/**
|
||||
* Sets storage policy for given path.
|
||||
* If the passed path is a directory, we'll set the storage policy for all files
|
||||
* created in the future in said directory. Note that this change in storage
|
||||
* policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
|
||||
* If we're running on a version of HDFS that doesn't support the given storage policy
|
||||
* (or storage policies at all), then we'll issue a log message and continue.
|
||||
*
|
||||
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
|
||||
*
|
||||
* @param fs We only do anything if an instance of DistributedFileSystem
|
||||
* @param path the Path whose storage policy is to be set
|
||||
* @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
|
||||
* org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
|
||||
* 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
|
||||
*/
|
||||
public static void setStoragePolicy(final FileSystem fs, final Path path,
|
||||
final String storagePolicy) {
|
||||
if (storagePolicy == null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed a null storagePolicy, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
final String trimmedStoragePolicy = storagePolicy.trim();
|
||||
if (trimmedStoragePolicy.isEmpty()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed an empty storagePolicy, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
boolean distributed = false;
|
||||
try {
|
||||
distributed = isDistributedFileSystem(fs);
|
||||
} catch (IOException ioe) {
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
|
||||
+ "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
|
||||
+ " on path=" + path);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
|
||||
+ "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
|
||||
+ " on path=" + path);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (distributed) {
|
||||
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* All args have been checked and are good. Run the setStoragePolicy invocation.
|
||||
*/
|
||||
private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
|
||||
final String storagePolicy) {
|
||||
Method m = null;
|
||||
try {
|
||||
m = fs.getClass().getDeclaredMethod("setStoragePolicy",
|
||||
new Class<?>[] { Path.class, String.class });
|
||||
m.setAccessible(true);
|
||||
} catch (NoSuchMethodException e) {
|
||||
final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available";
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn(msg, e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(msg, e);
|
||||
}
|
||||
m = null;
|
||||
} catch (SecurityException e) {
|
||||
final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available";
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn(msg, e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(msg, e);
|
||||
}
|
||||
m = null; // could happen on setAccessible()
|
||||
}
|
||||
if (m != null) {
|
||||
try {
|
||||
m.invoke(fs, path, storagePolicy);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
|
||||
// misuse than a runtime problem with HDFS.
|
||||
if (!warningMap.containsKey(fs)) {
|
||||
warningMap.put(fs, true);
|
||||
LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
|
||||
}
|
||||
// check for lack of HDFS-7228
|
||||
if (e instanceof InvocationTargetException) {
|
||||
final Throwable exception = e.getCause();
|
||||
if (exception instanceof RemoteException &&
|
||||
HadoopIllegalArgumentException.class.getName().equals(
|
||||
((RemoteException)exception).getClassName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
|
||||
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
|
||||
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
|
||||
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True is <code>fs</code> is instance of DistributedFileSystem
|
||||
* @throws IOException
|
||||
|
@ -283,32 +127,6 @@ public abstract class FSUtils {
|
|||
return fileSystem instanceof DistributedFileSystem;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare of path component. Does not consider schema; i.e. if schemas
|
||||
* different but <code>path</code> starts with <code>rootPath</code>,
|
||||
* then the function returns true
|
||||
* @param rootPath
|
||||
* @param path
|
||||
* @return True if <code>path</code> starts with <code>rootPath</code>
|
||||
*/
|
||||
public static boolean isStartingWithPath(final Path rootPath, final String path) {
|
||||
String uriRootPath = rootPath.toUri().getPath();
|
||||
String tailUriPath = (new Path(path)).toUri().getPath();
|
||||
return tailUriPath.startsWith(uriRootPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
|
||||
* '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
|
||||
* the two will equate.
|
||||
* @param pathToSearch Path we will be trying to match.
|
||||
* @param pathTail
|
||||
* @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
|
||||
*/
|
||||
public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
|
||||
return isMatchingTail(pathToSearch, new Path(pathTail));
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
|
||||
* '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
|
||||
|
@ -352,18 +170,6 @@ public abstract class FSUtils {
|
|||
return fsUtils;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete if exists.
|
||||
* @param fs filesystem object
|
||||
* @param dir directory to delete
|
||||
* @return True if deleted <code>dir</code>
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static boolean deleteDirectory(final FileSystem fs, final Path dir)
|
||||
throws IOException {
|
||||
return fs.exists(dir) && fs.delete(dir, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the region directory if exists.
|
||||
* @param conf
|
||||
|
@ -379,88 +185,6 @@ public abstract class FSUtils {
|
|||
new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of bytes that large input files should be optimally
|
||||
* be split into to minimize i/o time.
|
||||
*
|
||||
* use reflection to search for getDefaultBlockSize(Path f)
|
||||
* if the method doesn't exist, fall back to using getDefaultBlockSize()
|
||||
*
|
||||
* @param fs filesystem object
|
||||
* @return the default block size for the path's filesystem
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
|
||||
Method m = null;
|
||||
Class<? extends FileSystem> cls = fs.getClass();
|
||||
try {
|
||||
m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.info("FileSystem doesn't support getDefaultBlockSize");
|
||||
} catch (SecurityException e) {
|
||||
LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
|
||||
m = null; // could happen on setAccessible()
|
||||
}
|
||||
if (m == null) {
|
||||
return fs.getDefaultBlockSize(path);
|
||||
} else {
|
||||
try {
|
||||
Object ret = m.invoke(fs, path);
|
||||
return ((Long)ret).longValue();
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the default replication.
|
||||
*
|
||||
* use reflection to search for getDefaultReplication(Path f)
|
||||
* if the method doesn't exist, fall back to using getDefaultReplication()
|
||||
*
|
||||
* @param fs filesystem object
|
||||
* @param f path of file
|
||||
* @return default replication for the path's filesystem
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
|
||||
Method m = null;
|
||||
Class<? extends FileSystem> cls = fs.getClass();
|
||||
try {
|
||||
m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.info("FileSystem doesn't support getDefaultReplication");
|
||||
} catch (SecurityException e) {
|
||||
LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
|
||||
m = null; // could happen on setAccessible()
|
||||
}
|
||||
if (m == null) {
|
||||
return fs.getDefaultReplication(path);
|
||||
} else {
|
||||
try {
|
||||
Object ret = m.invoke(fs, path);
|
||||
return ((Number)ret).shortValue();
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default buffer size to use during writes.
|
||||
*
|
||||
* The size of the buffer should probably be a multiple of hardware
|
||||
* page size (4096 on Intel x86), and it determines how much data is
|
||||
* buffered during read and write operations.
|
||||
*
|
||||
* @param fs filesystem object
|
||||
* @return default buffer size to use during writes
|
||||
*/
|
||||
public static int getDefaultBufferSize(final FileSystem fs) {
|
||||
return fs.getConf().getInt("io.file.buffer.size", 4096);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the specified file on the filesystem. By default, this will:
|
||||
* <ol>
|
||||
|
@ -514,71 +238,6 @@ public abstract class FSUtils {
|
|||
return create(fs, path, perm, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the specified file on the filesystem. By default, this will:
|
||||
* <ol>
|
||||
* <li>apply the umask in the configuration (if it is enabled)</li>
|
||||
* <li>use the fs configured buffer size (or 4096 if not set)</li>
|
||||
* <li>use the default replication</li>
|
||||
* <li>use the default block size</li>
|
||||
* <li>not track progress</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param fs {@link FileSystem} on which to write the file
|
||||
* @param path {@link Path} to the file to write
|
||||
* @param perm
|
||||
* @param overwrite Whether or not the created file should be overwritten.
|
||||
* @return output stream to the created file
|
||||
* @throws IOException if the file cannot be created
|
||||
*/
|
||||
public static FSDataOutputStream create(FileSystem fs, Path path,
|
||||
FsPermission perm, boolean overwrite) throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
|
||||
}
|
||||
return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
|
||||
getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the file permissions specified in the configuration, if they are
|
||||
* enabled.
|
||||
*
|
||||
* @param fs filesystem that the file will be created on.
|
||||
* @param conf configuration to read for determining if permissions are
|
||||
* enabled and which to use
|
||||
* @param permssionConfKey property key in the configuration to use when
|
||||
* finding the permission
|
||||
* @return the permission to use when creating a new file on the fs. If
|
||||
* special permissions are not specified in the configuration, then
|
||||
* the default permissions on the the fs will be returned.
|
||||
*/
|
||||
public static FsPermission getFilePermissions(final FileSystem fs,
|
||||
final Configuration conf, final String permssionConfKey) {
|
||||
boolean enablePermissions = conf.getBoolean(
|
||||
HConstants.ENABLE_DATA_FILE_UMASK, false);
|
||||
|
||||
if (enablePermissions) {
|
||||
try {
|
||||
FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
|
||||
// make sure that we have a mask, if not, go default.
|
||||
String mask = conf.get(permssionConfKey);
|
||||
if (mask == null)
|
||||
return FsPermission.getFileDefault();
|
||||
// appy the umask
|
||||
FsPermission umask = new FsPermission(mask);
|
||||
return perm.applyUMask(umask);
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.warn(
|
||||
"Incorrect umask attempted to be created: "
|
||||
+ conf.get(permssionConfKey)
|
||||
+ ", using default file permissions.", e);
|
||||
return FsPermission.getFileDefault();
|
||||
}
|
||||
}
|
||||
return FsPermission.getFileDefault();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks to see if the specified file system is available
|
||||
*
|
||||
|
@ -1022,46 +681,6 @@ public abstract class FSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies root directory path is a valid URI with a scheme
|
||||
*
|
||||
* @param root root directory path
|
||||
* @return Passed <code>root</code> argument.
|
||||
* @throws IOException if not a valid URI with a scheme
|
||||
*/
|
||||
public static Path validateRootPath(Path root) throws IOException {
|
||||
try {
|
||||
URI rootURI = new URI(root.toString());
|
||||
String scheme = rootURI.getScheme();
|
||||
if (scheme == null) {
|
||||
throw new IOException("Root directory does not have a scheme");
|
||||
}
|
||||
return root;
|
||||
} catch (URISyntaxException e) {
|
||||
IOException io = new IOException("Root directory path is not a valid " +
|
||||
"URI -- check your " + HBASE_DIR + " configuration");
|
||||
io.initCause(e);
|
||||
throw io;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If
|
||||
* it exists, this method removes it and returns the String representation of remaining relative path.
|
||||
* @param path
|
||||
* @param conf
|
||||
* @return String representation of the remaining relative path
|
||||
* @throws IOException
|
||||
*/
|
||||
public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
|
||||
Path root = getWALRootDir(conf);
|
||||
String pathStr = path.toString();
|
||||
// check that the path is absolute... it has the root path in it.
|
||||
if (!pathStr.startsWith(root.toString())) return pathStr;
|
||||
// if not, return as it is.
|
||||
return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
|
||||
}
|
||||
|
||||
/**
|
||||
* If DFS, check safe mode and if so, wait until we clear it.
|
||||
* @param conf configuration
|
||||
|
@ -1085,81 +704,6 @@ public abstract class FSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the 'path' component of a Path. In Hadoop, Path is an URI. This
|
||||
* method returns the 'path' component of a Path's URI: e.g. If a Path is
|
||||
* <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
|
||||
* this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
|
||||
* This method is useful if you want to print out a Path without qualifying
|
||||
* Filesystem instance.
|
||||
* @param p Filesystem Path whose 'path' component we are to return.
|
||||
* @return Path portion of the Filesystem
|
||||
*/
|
||||
public static String getPath(Path p) {
|
||||
return p.toUri().getPath();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c configuration
|
||||
* @return {@link Path} to hbase root directory: i.e. {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} from
|
||||
* configuration as a qualified Path.
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static Path getRootDir(final Configuration c) throws IOException {
|
||||
Path p = new Path(c.get(HBASE_DIR));
|
||||
FileSystem fs = p.getFileSystem(c);
|
||||
return p.makeQualified(fs);
|
||||
}
|
||||
|
||||
public static void setRootDir(final Configuration c, final Path root) throws IOException {
|
||||
c.set(HBASE_DIR, root.toString());
|
||||
}
|
||||
|
||||
public static void setFsDefault(final Configuration c, final Path root) throws IOException {
|
||||
c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
|
||||
}
|
||||
|
||||
public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
|
||||
Path p = getRootDir(c);
|
||||
return p.getFileSystem(c);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c configuration
|
||||
* @return {@link Path} to hbase log root directory: i.e. {@value org.apache.hadoop.hbase.fs.HFileSystem#HBASE_WAL_DIR} from
|
||||
* configuration as a qualified Path. Defaults to {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR}
|
||||
* @throws IOException e
|
||||
*/
|
||||
public static Path getWALRootDir(final Configuration c) throws IOException {
|
||||
Path p = new Path(c.get(HFileSystem.HBASE_WAL_DIR, c.get(HBASE_DIR)));
|
||||
if (!isValidWALRootDir(p, c)) {
|
||||
return FSUtils.getRootDir(c);
|
||||
}
|
||||
FileSystem fs = p.getFileSystem(c);
|
||||
return p.makeQualified(fs);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
|
||||
c.set(HFileSystem.HBASE_WAL_DIR, root.toString());
|
||||
}
|
||||
|
||||
public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
|
||||
Path p = getWALRootDir(c);
|
||||
return p.getFileSystem(c);
|
||||
}
|
||||
|
||||
private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
|
||||
Path rootDir = FSUtils.getRootDir(c);
|
||||
if (walDir != rootDir) {
|
||||
if (walDir.toString().startsWith(rootDir.toString() + "/")) {
|
||||
throw new IllegalStateException("Illegal WAL directory specified. " +
|
||||
"WAL directories are not permitted to be under the root directory if set.");
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if meta region exists
|
||||
*
|
||||
|
@ -1297,44 +841,6 @@ public abstract class FSUtils {
|
|||
return frags;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
|
||||
* path rootdir
|
||||
*
|
||||
* @param rootdir qualified path of HBase root directory
|
||||
* @param tableName name of table
|
||||
* @return {@link org.apache.hadoop.fs.Path} for table
|
||||
*/
|
||||
public static Path getTableDir(Path rootdir, final TableName tableName) {
|
||||
return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
|
||||
tableName.getQualifierAsString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.hbase.TableName} object representing
|
||||
* the table directory under
|
||||
* path rootdir
|
||||
*
|
||||
* @param tablePath path of table
|
||||
* @return {@link org.apache.hadoop.fs.Path} for table
|
||||
*/
|
||||
public static TableName getTableName(Path tablePath) {
|
||||
return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.fs.Path} object representing
|
||||
* the namespace directory under path rootdir
|
||||
*
|
||||
* @param rootdir qualified path of HBase root directory
|
||||
* @param namespace namespace name
|
||||
* @return {@link org.apache.hadoop.fs.Path} for table
|
||||
*/
|
||||
public static Path getNamespaceDir(Path rootdir, final String namespace) {
|
||||
return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
|
||||
new Path(namespace)));
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link PathFilter} that returns only regular files.
|
||||
*/
|
||||
|
@ -1431,17 +937,6 @@ public abstract class FSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @return True if this filesystem whose scheme is 'hdfs'.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean isHDFS(final Configuration conf) throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
String scheme = fs.getUri().getScheme();
|
||||
return scheme.equalsIgnoreCase("hdfs");
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover file lease. Used when a file might be suspect
|
||||
* to be had been left open by another process.
|
||||
|
@ -1483,15 +978,6 @@ public abstract class FSUtils {
|
|||
return tabledirs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given path is the one with 'recovered.edits' dir.
|
||||
* @param path
|
||||
* @return True if we recovered edits
|
||||
*/
|
||||
public static boolean isRecoveredEdits(Path path) {
|
||||
return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter for all dirs that don't start with '.'
|
||||
*/
|
||||
|
@ -1668,18 +1154,6 @@ public abstract class FSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @return Returns the filesystem of the hbase rootdir.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static FileSystem getCurrentFileSystem(Configuration conf)
|
||||
throws IOException {
|
||||
return getRootDir(conf).getFileSystem(conf);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Runs through the HBase rootdir/tablename and creates a reverse lookup map for
|
||||
* table StoreFile names to the full Path.
|
||||
|
@ -1977,101 +1451,6 @@ public abstract class FSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
|
||||
* This accommodates differences between hadoop versions, where hadoop 1
|
||||
* does not throw a FileNotFoundException, and return an empty FileStatus[]
|
||||
* while Hadoop 2 will throw FileNotFoundException.
|
||||
*
|
||||
* Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem,
|
||||
* Path, FileStatusFilter)} instead.
|
||||
*
|
||||
* @param fs file system
|
||||
* @param dir directory
|
||||
* @param filter path filter
|
||||
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
|
||||
*/
|
||||
public static FileStatus [] listStatus(final FileSystem fs,
|
||||
final Path dir, final PathFilter filter) throws IOException {
|
||||
FileStatus [] status = null;
|
||||
try {
|
||||
status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// if directory doesn't exist, return null
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(dir + " doesn't exist");
|
||||
}
|
||||
}
|
||||
if (status == null || status.length < 1) return null;
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.listStatus() and treats FileNotFoundException as non-fatal
|
||||
* This would accommodates differences between hadoop versions
|
||||
*
|
||||
* @param fs file system
|
||||
* @param dir directory
|
||||
* @return null if dir is empty or doesn't exist, otherwise FileStatus array
|
||||
*/
|
||||
public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
|
||||
return listStatus(fs, dir, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
|
||||
*
|
||||
* @param fs file system
|
||||
* @param dir directory
|
||||
* @return LocatedFileStatus list
|
||||
*/
|
||||
public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
|
||||
final Path dir) throws IOException {
|
||||
List<LocatedFileStatus> status = null;
|
||||
try {
|
||||
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
|
||||
.listFiles(dir, false);
|
||||
while (locatedFileStatusRemoteIterator.hasNext()) {
|
||||
if (status == null) {
|
||||
status = Lists.newArrayList();
|
||||
}
|
||||
status.add(locatedFileStatusRemoteIterator.next());
|
||||
}
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// if directory doesn't exist, return null
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(dir + " doesn't exist");
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.delete() and returns the value returned by the fs.delete()
|
||||
*
|
||||
* @param fs
|
||||
* @param path
|
||||
* @param recursive
|
||||
* @return the value returned by the fs.delete()
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
|
||||
throws IOException {
|
||||
return fs.delete(path, recursive);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls fs.exists(). Checks if the specified path exists
|
||||
*
|
||||
* @param fs
|
||||
* @param path
|
||||
* @return the value returned by fs.exists()
|
||||
* @throws IOException
|
||||
*/
|
||||
public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
|
||||
return fs.exists(path);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throw an exception if an action is not permitted by a user on a file.
|
||||
*
|
||||
|
@ -2108,46 +1487,6 @@ public abstract class FSUtils {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Log the current state of the filesystem from a certain root directory
|
||||
* @param fs filesystem to investigate
|
||||
* @param root root file/directory to start logging from
|
||||
* @param LOG log to output information
|
||||
* @throws IOException if an unexpected exception occurs
|
||||
*/
|
||||
public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
|
||||
throws IOException {
|
||||
LOG.debug("Current file system:");
|
||||
logFSTree(LOG, fs, root, "|-");
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursive helper to log the state of the FS
|
||||
*
|
||||
* @see #logFileSystemState(FileSystem, Path, Log)
|
||||
*/
|
||||
private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
|
||||
throws IOException {
|
||||
FileStatus[] files = FSUtils.listStatus(fs, root, null);
|
||||
if (files == null) return;
|
||||
|
||||
for (FileStatus file : files) {
|
||||
if (file.isDirectory()) {
|
||||
LOG.debug(prefix + file.getPath().getName() + "/");
|
||||
logFSTree(LOG, fs, file.getPath(), prefix + "---");
|
||||
} else {
|
||||
LOG.debug(prefix + file.getPath().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
|
||||
throws IOException {
|
||||
// set the modify time for TimeToLive Cleaner
|
||||
fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
|
||||
return fs.rename(src, dest);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is to scan the root path of the file system to get the
|
||||
* degree of locality for each region on each of the servers having at least
|
||||
|
@ -2397,4 +1736,5 @@ public abstract class FSUtils {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
|
@ -52,7 +52,13 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
|||
|
||||
// Only public so classes back in regionserver.wal can access
|
||||
public interface AsyncWriter extends WALProvider.AsyncWriter {
|
||||
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
|
||||
/**
|
||||
* @throws IOException if something goes wrong initializing an output stream
|
||||
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
|
||||
* meet the needs of the given Writer implementation.
|
||||
*/
|
||||
void init(FileSystem fs, Path path, Configuration c, boolean overwritable)
|
||||
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
|
||||
}
|
||||
|
||||
private EventLoopGroup eventLoopGroup;
|
||||
|
@ -60,7 +66,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
|||
private Class<? extends Channel> channelClass;
|
||||
@Override
|
||||
protected AsyncFSWAL createWAL() throws IOException {
|
||||
return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
|
||||
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
|
||||
getWALDirectoryName(factory.factoryId),
|
||||
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
|
||||
|
@ -96,7 +102,15 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
|||
writer.init(fs, path, conf, overwritable);
|
||||
return writer;
|
||||
} catch (Exception e) {
|
||||
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
|
||||
LOG.error("The RegionServer async write ahead log provider " +
|
||||
"relies on the ability to call " + e.getMessage() + " for proper operation during " +
|
||||
"component failures, but the current FileSystem does not support doing so. Please " +
|
||||
"check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " +
|
||||
"it points to a FileSystem mount that has suitable capabilities for output streams.");
|
||||
} else {
|
||||
LOG.debug("Error instantiating log writer.", e);
|
||||
}
|
||||
Throwables.propagateIfPossible(e, IOException.class);
|
||||
throw new IOException("cannot get log writer", e);
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceStability;
|
|||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
|
||||
/**
|
||||
* A WAL provider that use {@link FSHLog}.
|
||||
|
@ -44,7 +44,13 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
|||
|
||||
// Only public so classes back in regionserver.wal can access
|
||||
public interface Writer extends WALProvider.Writer {
|
||||
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
|
||||
/**
|
||||
* @throws IOException if something goes wrong initializing an output stream
|
||||
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
|
||||
* meet the needs of the given Writer implementation.
|
||||
*/
|
||||
void init(FileSystem fs, Path path, Configuration c, boolean overwritable)
|
||||
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -61,7 +67,15 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
|||
writer.init(fs, path, conf, overwritable);
|
||||
return writer;
|
||||
} catch (Exception e) {
|
||||
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
|
||||
LOG.error("The RegionServer write ahead log provider for FileSystem implementations " +
|
||||
"relies on the ability to call " + e.getMessage() + " for proper operation during " +
|
||||
"component failures, but the current FileSystem does not support doing so. Please " +
|
||||
"check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " +
|
||||
"it points to a FileSystem mount that has suitable capabilities for output streams.");
|
||||
} else {
|
||||
LOG.debug("Error instantiating log writer.", e);
|
||||
}
|
||||
if (writer != null) {
|
||||
try{
|
||||
writer.close();
|
||||
|
@ -75,7 +89,7 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
|||
|
||||
@Override
|
||||
protected FSHLog createWAL() throws IOException {
|
||||
return new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
|
||||
return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
|
||||
getWALDirectoryName(factory.factoryId),
|
||||
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -55,7 +56,8 @@ public class TestLocalAsyncOutput {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException, InterruptedException, ExecutionException {
|
||||
public void test() throws IOException, InterruptedException, ExecutionException,
|
||||
FSUtils.StreamLacksCapabilityException {
|
||||
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
|
||||
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
|
||||
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.security.Superusers;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
@ -184,10 +185,8 @@ public class MockMasterServices extends MockNoopMasterServices {
|
|||
throws IOException {
|
||||
final Configuration conf = getConfiguration();
|
||||
final Path logDir = new Path(fileSystemManager.getRootDir(),
|
||||
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
|
||||
WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
|
||||
|
||||
//procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
|
||||
// new MasterProcedureEnv.WALStoreLeaseRecovery(this));
|
||||
this.procedureStore = new NoopProcedureStore();
|
||||
this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
|
||||
|
||||
|
|
|
@ -115,8 +115,8 @@ public class TestMasterProcedureWalLease {
|
|||
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
|
||||
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
|
||||
final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
|
||||
firstMaster.getMasterFileSystem().getFileSystem(),
|
||||
((WALProcedureStore)masterStore).getWALDir(),
|
||||
null,
|
||||
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
|
||||
// Abort Latch for the test store
|
||||
final CountDownLatch backupStore3Abort = new CountDownLatch(1);
|
||||
|
@ -195,8 +195,8 @@ public class TestMasterProcedureWalLease {
|
|||
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
|
||||
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
|
||||
final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
|
||||
firstMaster.getMasterFileSystem().getFileSystem(),
|
||||
((WALProcedureStore)procStore).getWALDir(),
|
||||
null,
|
||||
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
|
||||
|
||||
// start a second store which should fence the first one out
|
||||
|
|
|
@ -76,7 +76,7 @@ public class TestWALProcedureStoreOnHDFS {
|
|||
MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
|
||||
|
||||
Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
|
||||
store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
|
||||
store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
|
||||
store.registerListener(stopProcedureListener);
|
||||
store.start(8);
|
||||
store.recoverLease();
|
||||
|
|
|
@ -58,9 +58,9 @@ import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
|
@ -139,8 +139,8 @@ public abstract class AbstractTestFSWAL {
|
|||
// test to see whether the coprocessor is loaded or not.
|
||||
AbstractFSWAL<?> wal = null;
|
||||
try {
|
||||
wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
|
||||
CONF, null, true, null, null);
|
||||
wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
|
||||
WALCoprocessorHost host = wal.getCoprocessorHost();
|
||||
Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
|
||||
assertNotNull(c);
|
||||
|
@ -187,8 +187,8 @@ public abstract class AbstractTestFSWAL {
|
|||
AbstractFSWAL<?> wal1 = null;
|
||||
AbstractFSWAL<?> walMeta = null;
|
||||
try {
|
||||
wal1 = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
|
||||
CONF, null, true, null, null);
|
||||
wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
|
||||
LOG.debug("Log obtained is: " + wal1);
|
||||
Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
|
||||
Path p1 = wal1.computeFilename(11);
|
||||
|
@ -197,9 +197,9 @@ public abstract class AbstractTestFSWAL {
|
|||
assertTrue(comp.compare(p1, p1) == 0);
|
||||
// comparing with different filenum.
|
||||
assertTrue(comp.compare(p1, p2) < 0);
|
||||
walMeta =
|
||||
newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
|
||||
CONF, null, true, null, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
|
||||
walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null,
|
||||
AbstractFSWALProvider.META_WAL_PROVIDER_ID);
|
||||
Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
|
||||
|
||||
Path p1WithMeta = walMeta.computeFilename(11);
|
||||
|
@ -245,7 +245,7 @@ public abstract class AbstractTestFSWAL {
|
|||
LOG.debug("testFindMemStoresEligibleForFlush");
|
||||
Configuration conf1 = HBaseConfiguration.create(CONF);
|
||||
conf1.setInt("hbase.regionserver.maxlogs", 1);
|
||||
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(conf1), DIR.toString(),
|
||||
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
|
||||
HTableDescriptor t1 =
|
||||
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
|
||||
|
@ -332,9 +332,10 @@ public abstract class AbstractTestFSWAL {
|
|||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void testFailedToCreateWALIfParentRenamed() throws IOException {
|
||||
public void testFailedToCreateWALIfParentRenamed() throws IOException,
|
||||
CommonFSUtils.StreamLacksCapabilityException {
|
||||
final String name = "testFailedToCreateWALIfParentRenamed";
|
||||
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(CONF), name,
|
||||
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name,
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
|
||||
long filenum = System.currentTimeMillis();
|
||||
Path path = wal.computeFilename(filenum);
|
||||
|
@ -373,8 +374,8 @@ public abstract class AbstractTestFSWAL {
|
|||
scopes.put(fam, 0);
|
||||
}
|
||||
// subclass and doctor a method.
|
||||
AbstractFSWAL<?> wal = newSlowWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF,
|
||||
null, true, null, null, new Runnable() {
|
||||
AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
|
||||
testName, CONF, null, true, null, null, new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -434,8 +435,8 @@ public abstract class AbstractTestFSWAL {
|
|||
@Test
|
||||
public void testSyncNoAppend() throws IOException {
|
||||
String testName = currentTest.getMethodName();
|
||||
AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF,
|
||||
null, true, null, null);
|
||||
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
|
||||
CONF, null, true, null, null);
|
||||
try {
|
||||
wal.sync();
|
||||
} finally {
|
||||
|
|
|
@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
|||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -1029,7 +1030,8 @@ public abstract class AbstractTestWALReplay {
|
|||
/**
|
||||
* testcase for https://issues.apache.org/jira/browse/HBASE-14949.
|
||||
*/
|
||||
private void testNameConflictWhenSplit(boolean largeFirst) throws IOException {
|
||||
private void testNameConflictWhenSplit(boolean largeFirst) throws IOException,
|
||||
StreamLacksCapabilityException {
|
||||
final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
|
||||
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
|
||||
|
@ -1071,12 +1073,12 @@ public abstract class AbstractTestWALReplay {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testNameConflictWhenSplit0() throws IOException {
|
||||
public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
|
||||
testNameConflictWhenSplit(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNameConflictWhenSplit1() throws IOException {
|
||||
public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
|
||||
testNameConflictWhenSplit(false);
|
||||
}
|
||||
|
||||
|
@ -1231,7 +1233,8 @@ public abstract class AbstractTestWALReplay {
|
|||
return htd;
|
||||
}
|
||||
|
||||
private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException {
|
||||
private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException,
|
||||
StreamLacksCapabilityException {
|
||||
fs.mkdirs(file.getParent());
|
||||
ProtobufLogWriter writer = new ProtobufLogWriter();
|
||||
writer.init(fs, file, conf, true);
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.io.IOException;
|
|||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
|
@ -41,7 +43,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -57,6 +58,7 @@ import org.junit.experimental.categories.Category;
|
|||
*/
|
||||
@Category({MiscTests.class, MediumTests.class})
|
||||
public class TestFSUtils {
|
||||
private static final Log LOG = LogFactory.getLog(TestFSUtils.class);
|
||||
|
||||
private HBaseTestingUtility htu;
|
||||
private FileSystem fs;
|
||||
|
@ -69,53 +71,6 @@ public class TestFSUtils {
|
|||
conf = htu.getConfiguration();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test path compare and prefix checking.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testMatchingTail() throws IOException {
|
||||
Path rootdir = htu.getDataTestDir();
|
||||
assertTrue(rootdir.depth() > 1);
|
||||
Path partPath = new Path("a", "b");
|
||||
Path fullPath = new Path(rootdir, partPath);
|
||||
Path fullyQualifiedPath = fs.makeQualified(fullPath);
|
||||
assertFalse(FSUtils.isMatchingTail(fullPath, partPath));
|
||||
assertFalse(FSUtils.isMatchingTail(fullPath, partPath.toString()));
|
||||
assertTrue(FSUtils.isStartingWithPath(rootdir, fullPath.toString()));
|
||||
assertTrue(FSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString()));
|
||||
assertFalse(FSUtils.isStartingWithPath(rootdir, partPath.toString()));
|
||||
assertFalse(FSUtils.isMatchingTail(fullyQualifiedPath, partPath));
|
||||
assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath));
|
||||
assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString()));
|
||||
assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath)));
|
||||
assertTrue(FSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString()));
|
||||
assertFalse(FSUtils.isMatchingTail(fullPath, new Path("x")));
|
||||
assertFalse(FSUtils.isMatchingTail(new Path("x"), fullPath));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVersion() throws DeserializationException, IOException {
|
||||
final Path rootdir = htu.getDataTestDir();
|
||||
assertNull(FSUtils.getVersion(fs, rootdir));
|
||||
// Write out old format version file. See if we can read it in and convert.
|
||||
Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
|
||||
FSDataOutputStream s = fs.create(versionFile);
|
||||
final String version = HConstants.FILE_SYSTEM_VERSION;
|
||||
s.writeUTF(version);
|
||||
s.close();
|
||||
assertTrue(fs.exists(versionFile));
|
||||
FileStatus [] status = fs.listStatus(versionFile);
|
||||
assertNotNull(status);
|
||||
assertTrue(status.length > 0);
|
||||
String newVersion = FSUtils.getVersion(fs, rootdir);
|
||||
assertEquals(version.length(), newVersion.length());
|
||||
assertEquals(version, newVersion);
|
||||
// File will have been converted. Exercise the pb format
|
||||
assertEquals(version, FSUtils.getVersion(fs, rootdir));
|
||||
FSUtils.checkVersion(fs, rootdir, true);
|
||||
}
|
||||
|
||||
@Test public void testIsHDFS() throws Exception {
|
||||
assertFalse(FSUtils.isHDFS(conf));
|
||||
MiniDFSCluster cluster = null;
|
||||
|
@ -238,8 +193,33 @@ public class TestFSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVersion() throws DeserializationException, IOException {
|
||||
final Path rootdir = htu.getDataTestDir();
|
||||
final FileSystem fs = rootdir.getFileSystem(conf);
|
||||
assertNull(FSUtils.getVersion(fs, rootdir));
|
||||
// Write out old format version file. See if we can read it in and convert.
|
||||
Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
|
||||
FSDataOutputStream s = fs.create(versionFile);
|
||||
final String version = HConstants.FILE_SYSTEM_VERSION;
|
||||
s.writeUTF(version);
|
||||
s.close();
|
||||
assertTrue(fs.exists(versionFile));
|
||||
FileStatus [] status = fs.listStatus(versionFile);
|
||||
assertNotNull(status);
|
||||
assertTrue(status.length > 0);
|
||||
String newVersion = FSUtils.getVersion(fs, rootdir);
|
||||
assertEquals(version.length(), newVersion.length());
|
||||
assertEquals(version, newVersion);
|
||||
// File will have been converted. Exercise the pb format
|
||||
assertEquals(version, FSUtils.getVersion(fs, rootdir));
|
||||
FSUtils.checkVersion(fs, rootdir, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPermMask() throws Exception {
|
||||
final Path rootdir = htu.getDataTestDir();
|
||||
final FileSystem fs = rootdir.getFileSystem(conf);
|
||||
// default fs permission
|
||||
FsPermission defaultFsPerm = FSUtils.getFilePermissions(fs, conf,
|
||||
HConstants.DATA_FILE_UMASK_KEY);
|
||||
|
@ -277,6 +257,8 @@ public class TestFSUtils {
|
|||
|
||||
@Test
|
||||
public void testDeleteAndExists() throws Exception {
|
||||
final Path rootdir = htu.getDataTestDir();
|
||||
final FileSystem fs = rootdir.getFileSystem(conf);
|
||||
conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
|
||||
FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
|
||||
// then that the correct file is created
|
||||
|
@ -302,6 +284,7 @@ public class TestFSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRenameAndSetModifyTime() throws Exception {
|
||||
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
|
||||
|
@ -338,6 +321,24 @@ public class TestFSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetStoragePolicyDefault() throws Exception {
|
||||
verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
|
||||
}
|
||||
|
||||
/* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
|
||||
@Test
|
||||
public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
|
||||
verifyFileInDirWithStoragePolicy("ALL_SSD");
|
||||
}
|
||||
|
||||
/* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
|
||||
@Test
|
||||
public void testSetStoragePolicyInvalid() throws Exception {
|
||||
verifyFileInDirWithStoragePolicy("1772");
|
||||
}
|
||||
|
||||
// Here instead of TestCommonFSUtils because we need a minicluster
|
||||
private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
|
||||
conf.set(HConstants.WAL_STORAGE_POLICY, policy);
|
||||
|
||||
|
@ -362,63 +363,6 @@ public class TestFSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetStoragePolicyDefault() throws Exception {
|
||||
verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
|
||||
}
|
||||
|
||||
/* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
|
||||
@Test
|
||||
public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
|
||||
verifyFileInDirWithStoragePolicy("ALL_SSD");
|
||||
}
|
||||
|
||||
/* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
|
||||
@Test
|
||||
public void testSetStoragePolicyInvalid() throws Exception {
|
||||
verifyFileInDirWithStoragePolicy("1772");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetWALRootDir() throws Exception {
|
||||
Path p = new Path("file:///hbase/root");
|
||||
FSUtils.setWALRootDir(conf, p);
|
||||
assertEquals(p.toString(), conf.get(HFileSystem.HBASE_WAL_DIR));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWALRootDir() throws IOException {
|
||||
Path root = new Path("file:///hbase/root");
|
||||
Path walRoot = new Path("file:///hbase/logroot");
|
||||
FSUtils.setRootDir(conf, root);
|
||||
assertEquals(FSUtils.getRootDir(conf), root);
|
||||
assertEquals(FSUtils.getWALRootDir(conf), root);
|
||||
FSUtils.setWALRootDir(conf, walRoot);
|
||||
assertEquals(FSUtils.getWALRootDir(conf), walRoot);
|
||||
}
|
||||
|
||||
@Test(expected=IllegalStateException.class)
|
||||
public void testGetWALRootDirIllegalWALDir() throws IOException {
|
||||
Path root = new Path("file:///hbase/root");
|
||||
Path invalidWALDir = new Path("file:///hbase/root/logroot");
|
||||
FSUtils.setRootDir(conf, root);
|
||||
FSUtils.setWALRootDir(conf, invalidWALDir);
|
||||
FSUtils.getWALRootDir(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveWALRootPath() throws Exception {
|
||||
FSUtils.setRootDir(conf, new Path("file:///user/hbase"));
|
||||
Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile");
|
||||
Path tmpFile = new Path("file:///test/testfile");
|
||||
assertEquals(FSUtils.removeWALRootPath(testFile, conf), "test/testfile");
|
||||
assertEquals(FSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
|
||||
FSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
|
||||
assertEquals(FSUtils.removeWALRootPath(testFile, conf), testFile.toString());
|
||||
Path logFile = new Path(FSUtils.getWALRootDir(conf), "test/testlog");
|
||||
assertEquals(FSUtils.removeWALRootPath(logFile, conf), "test/testlog");
|
||||
}
|
||||
|
||||
/**
|
||||
* Ugly test that ensures we can get at the hedged read counters in dfsclient.
|
||||
* Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
|
||||
|
@ -565,4 +509,37 @@ public class TestFSUtils {
|
|||
assertTrue(fileSys.delete(name, true));
|
||||
assertTrue(!fileSys.exists(name));
|
||||
}
|
||||
|
||||
|
||||
private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
|
||||
static {
|
||||
boolean tmp = false;
|
||||
try {
|
||||
Class.forName("org.apache.hadoop.fs.StreamCapabilities");
|
||||
tmp = true;
|
||||
LOG.debug("Test thought StreamCapabilities class was present.");
|
||||
} catch (ClassNotFoundException exception) {
|
||||
LOG.debug("Test didn't think StreamCapabilities class was present.");
|
||||
} finally {
|
||||
STREAM_CAPABILITIES_IS_PRESENT = tmp;
|
||||
}
|
||||
}
|
||||
|
||||
// Here instead of TestCommonFSUtils because we need a minicluster
|
||||
@Test
|
||||
public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
|
||||
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
|
||||
try (FileSystem filesystem = cluster.getFileSystem()) {
|
||||
FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
|
||||
assertTrue(FSUtils.hasCapability(stream, "hsync"));
|
||||
assertTrue(FSUtils.hasCapability(stream, "hflush"));
|
||||
assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " +
|
||||
"StreamCapabilities class is not defined.",
|
||||
STREAM_CAPABILITIES_IS_PRESENT,
|
||||
FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add."));
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
/**
|
||||
|
@ -100,7 +100,7 @@ public class IOTestProvider implements WALProvider {
|
|||
providerId = DEFAULT_PROVIDER_ID;
|
||||
}
|
||||
final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
|
||||
log = new IOTestWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
|
||||
log = new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
|
||||
AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix,
|
||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
|
||||
|
@ -184,7 +184,12 @@ public class IOTestProvider implements WALProvider {
|
|||
if (!initialized || doFileRolls) {
|
||||
LOG.info("creating new writer instance.");
|
||||
final ProtobufLogWriter writer = new IOTestWriter();
|
||||
try {
|
||||
writer.init(fs, path, conf, false);
|
||||
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
|
||||
throw new IOException("Can't create writer instance because underlying FileSystem " +
|
||||
"doesn't support needed stream capabilities.", exception);
|
||||
}
|
||||
if (!initialized) {
|
||||
LOG.info("storing initial writer instance in case file rolling isn't allowed.");
|
||||
noRollsWriter = writer;
|
||||
|
@ -207,7 +212,8 @@ public class IOTestProvider implements WALProvider {
|
|||
private boolean doSyncs;
|
||||
|
||||
@Override
|
||||
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException {
|
||||
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
|
||||
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
|
||||
Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
|
||||
if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
|
||||
doAppends = doSyncs = true;
|
||||
|
|
Loading…
Reference in New Issue