From a9f0c5d4e2c85c1faae1b4b277e3c290c8b81d2a Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Thu, 12 Oct 2017 10:59:43 -0500 Subject: [PATCH] HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync. * pull things that don't rely on HDFS in hbase-server/FSUtils into hbase-common/CommonFSUtils * refactor setStoragePolicy so that it can move into hbase-common/CommonFSUtils, as a side effect update it for Hadoop 2.8,3.0+ * refactor WALProcedureStore so that it handles its own FS interactions * add a reflection-based lookup of stream capabilities * call said lookup in places where we make WALs to make sure hflush/hsync is available. * javadoc / checkstyle cleanup on changes as flagged by yetus Signed-off-by: Chia-Ping Tsai --- .../hadoop/hbase/util/CommonFSUtils.java | 890 ++++++++++++++++++ .../hadoop/hbase/util/TestCommonFSUtils.java | 164 ++++ .../store/wal/WALProcedureStore.java | 70 +- .../procedure2/ProcedureTestingUtility.java | 12 +- .../hbase/procedure2/TestChildProcedures.java | 2 +- .../hbase/procedure2/TestProcedureEvents.java | 2 +- .../procedure2/TestProcedureExecution.java | 2 +- .../procedure2/TestProcedureMetrics.java | 2 +- .../hbase/procedure2/TestProcedureNonce.java | 2 +- .../procedure2/TestProcedureRecovery.java | 2 +- .../procedure2/TestProcedureReplayOrder.java | 2 +- .../procedure2/TestStateMachineProcedure.java | 2 +- .../hbase/procedure2/TestYieldProcedures.java | 2 +- ...ocedureWALLoaderPerformanceEvaluation.java | 2 +- .../ProcedureWALPerformanceEvaluation.java | 9 +- .../wal/TestStressWALProcedureStore.java | 2 +- .../store/wal/TestWALProcedureStore.java | 4 +- .../apache/hadoop/hbase/fs/HFileSystem.java | 3 - .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 11 +- .../apache/hadoop/hbase/master/HMaster.java | 18 +- .../hadoop/hbase/master/MasterFileSystem.java | 10 +- .../procedure/MasterProcedureConstants.java | 3 - .../hbase/regionserver/wal/AbstractFSWAL.java | 29 +- .../wal/AbstractProtobufLogWriter.java | 5 +- .../wal/AsyncProtobufLogWriter.java | 5 +- .../regionserver/wal/ProtobufLogWriter.java | 8 +- .../org/apache/hadoop/hbase/util/FSUtils.java | 668 +------------ .../hadoop/hbase/wal/AsyncFSWALProvider.java | 22 +- .../hadoop/hbase/wal/FSHLogProvider.java | 22 +- .../io/asyncfs/TestLocalAsyncOutput.java | 4 +- .../master/assignment/MockMasterServices.java | 5 +- .../TestMasterProcedureWalLease.java | 4 +- .../TestWALProcedureStoreOnHDFS.java | 2 +- .../regionserver/wal/AbstractTestFSWAL.java | 45 +- .../wal/AbstractTestWALReplay.java | 11 +- .../apache/hadoop/hbase/util/TestFSUtils.java | 187 ++-- .../hadoop/hbase/wal/IOTestProvider.java | 14 +- 37 files changed, 1344 insertions(+), 903 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java new file mode 100644 index 00000000000..bdf148eff15 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -0,0 +1,890 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Utility methods for interacting with the underlying file system. + */ +@InterfaceAudience.Private +public abstract class CommonFSUtils { + private static final Log LOG = LogFactory.getLog(CommonFSUtils.class); + + /** Parameter name for HBase WAL directory */ + public static final String HBASE_WAL_DIR = "hbase.wal.dir"; + + /** Full access permissions (starting point for a umask) */ + public static final String FULL_RWX_PERMISSIONS = "777"; + + protected CommonFSUtils() { + super(); + } + + /** + * Compare of path component. Does not consider schema; i.e. if schemas + * different but path starts with rootPath, + * then the function returns true + * @param rootPath value to check for + * @param path subject to check + * @return True if path starts with rootPath + */ + public static boolean isStartingWithPath(final Path rootPath, final String path) { + String uriRootPath = rootPath.toUri().getPath(); + String tailUriPath = (new Path(path)).toUri().getPath(); + return tailUriPath.startsWith(uriRootPath); + } + + /** + * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the + * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, + * the two will equate. + * @param pathToSearch Path we will be trying to match against. + * @param pathTail what to match + * @return True if pathTail is tail on the path of pathToSearch + */ + public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { + return isMatchingTail(pathToSearch, new Path(pathTail)); + } + + /** + * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the + * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider + * schema; i.e. if schemas different but path or subpath matches, the two will equate. + * @param pathToSearch Path we will be trying to match agains against + * @param pathTail what to match + * @return True if pathTail is tail on the path of pathToSearch + */ + public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { + if (pathToSearch.depth() != pathTail.depth()) { + return false; + } + Path tailPath = pathTail; + String tailName; + Path toSearch = pathToSearch; + String toSearchName; + boolean result = false; + do { + tailName = tailPath.getName(); + if (tailName == null || tailName.length() <= 0) { + result = true; + break; + } + toSearchName = toSearch.getName(); + if (toSearchName == null || toSearchName.length() <= 0) { + break; + } + // Move up a parent on each path for next go around. Path doesn't let us go off the end. + tailPath = tailPath.getParent(); + toSearch = toSearch.getParent(); + } while(tailName.equals(toSearchName)); + return result; + } + + /** + * Delete if exists. + * @param fs filesystem object + * @param dir directory to delete + * @return True if deleted dir + * @throws IOException e + */ + public static boolean deleteDirectory(final FileSystem fs, final Path dir) + throws IOException { + return fs.exists(dir) && fs.delete(dir, true); + } + + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. + * + * use reflection to search for getDefaultBlockSize(Path f) + * if the method doesn't exist, fall back to using getDefaultBlockSize() + * + * @param fs filesystem object + * @return the default block size for the path's filesystem + * @throws IOException e + */ + public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException { + Method m = null; + Class cls = fs.getClass(); + try { + m = cls.getMethod("getDefaultBlockSize", new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem doesn't support getDefaultBlockSize"); + } catch (SecurityException e) { + LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e); + m = null; // could happen on setAccessible() + } + if (m == null) { + return fs.getDefaultBlockSize(path); + } else { + try { + Object ret = m.invoke(fs, path); + return ((Long)ret).longValue(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /* + * Get the default replication. + * + * use reflection to search for getDefaultReplication(Path f) + * if the method doesn't exist, fall back to using getDefaultReplication() + * + * @param fs filesystem object + * @param f path of file + * @return default replication for the path's filesystem + * @throws IOException e + */ + public static short getDefaultReplication(final FileSystem fs, final Path path) + throws IOException { + Method m = null; + Class cls = fs.getClass(); + try { + m = cls.getMethod("getDefaultReplication", new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem doesn't support getDefaultReplication"); + } catch (SecurityException e) { + LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e); + m = null; // could happen on setAccessible() + } + if (m == null) { + return fs.getDefaultReplication(path); + } else { + try { + Object ret = m.invoke(fs, path); + return ((Number)ret).shortValue(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /** + * Returns the default buffer size to use during writes. + * + * The size of the buffer should probably be a multiple of hardware + * page size (4096 on Intel x86), and it determines how much data is + * buffered during read and write operations. + * + * @param fs filesystem object + * @return default buffer size to use during writes + */ + public static int getDefaultBufferSize(final FileSystem fs) { + return fs.getConf().getInt("io.file.buffer.size", 4096); + } + + /** + * Create the specified file on the filesystem. By default, this will: + *
    + *
  1. apply the umask in the configuration (if it is enabled)
  2. + *
  3. use the fs configured buffer size (or 4096 if not set)
  4. + *
  5. use the default replication
  6. + *
  7. use the default block size
  8. + *
  9. not track progress
  10. + *
+ * + * @param fs {@link FileSystem} on which to write the file + * @param path {@link Path} to the file to write + * @param perm intial permissions + * @param overwrite Whether or not the created file should be overwritten. + * @return output stream to the created file + * @throws IOException if the file cannot be created + */ + public static FSDataOutputStream create(FileSystem fs, Path path, + FsPermission perm, boolean overwrite) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite); + } + return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), + getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); + } + + /** + * Get the file permissions specified in the configuration, if they are + * enabled. + * + * @param fs filesystem that the file will be created on. + * @param conf configuration to read for determining if permissions are + * enabled and which to use + * @param permssionConfKey property key in the configuration to use when + * finding the permission + * @return the permission to use when creating a new file on the fs. If + * special permissions are not specified in the configuration, then + * the default permissions on the the fs will be returned. + */ + public static FsPermission getFilePermissions(final FileSystem fs, + final Configuration conf, final String permssionConfKey) { + boolean enablePermissions = conf.getBoolean( + HConstants.ENABLE_DATA_FILE_UMASK, false); + + if (enablePermissions) { + try { + FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); + // make sure that we have a mask, if not, go default. + String mask = conf.get(permssionConfKey); + if (mask == null) { + return FsPermission.getFileDefault(); + } + // appy the umask + FsPermission umask = new FsPermission(mask); + return perm.applyUMask(umask); + } catch (IllegalArgumentException e) { + LOG.warn( + "Incorrect umask attempted to be created: " + + conf.get(permssionConfKey) + + ", using default file permissions.", e); + return FsPermission.getFileDefault(); + } + } + return FsPermission.getFileDefault(); + } + + /** + * Verifies root directory path is a valid URI with a scheme + * + * @param root root directory path + * @return Passed root argument. + * @throws IOException if not a valid URI with a scheme + */ + public static Path validateRootPath(Path root) throws IOException { + try { + URI rootURI = new URI(root.toString()); + String scheme = rootURI.getScheme(); + if (scheme == null) { + throw new IOException("Root directory does not have a scheme"); + } + return root; + } catch (URISyntaxException e) { + IOException io = new IOException("Root directory path is not a valid " + + "URI -- check your " + HConstants.HBASE_DIR + " configuration"); + io.initCause(e); + throw io; + } + } + + /** + * Checks for the presence of the WAL log root path (using the provided conf object) in the given + * path. If it exists, this method removes it and returns the String representation of remaining + * relative path. + * @param path must not be null + * @param conf must not be null + * @return String representation of the remaining relative path + * @throws IOException from underlying filesystem + */ + public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { + Path root = getWALRootDir(conf); + String pathStr = path.toString(); + // check that the path is absolute... it has the root path in it. + if (!pathStr.startsWith(root.toString())) { + return pathStr; + } + // if not, return as it is. + return pathStr.substring(root.toString().length() + 1);// remove the "/" too. + } + + /** + * Return the 'path' component of a Path. In Hadoop, Path is an URI. This + * method returns the 'path' component of a Path's URI: e.g. If a Path is + * hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir, + * this method returns /hbase_trunk/TestTable/compaction.dir. + * This method is useful if you want to print out a Path without qualifying + * Filesystem instance. + * @param p Filesystem Path whose 'path' component we are to return. + * @return Path portion of the Filesystem + */ + public static String getPath(Path p) { + return p.toUri().getPath(); + } + + /** + * @param c configuration + * @return {@link Path} to hbase root directory from + * configuration as a qualified Path. + * @throws IOException e + */ + public static Path getRootDir(final Configuration c) throws IOException { + Path p = new Path(c.get(HConstants.HBASE_DIR)); + FileSystem fs = p.getFileSystem(c); + return p.makeQualified(fs); + } + + public static void setRootDir(final Configuration c, final Path root) throws IOException { + c.set(HConstants.HBASE_DIR, root.toString()); + } + + public static void setFsDefault(final Configuration c, final Path root) throws IOException { + c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ + } + + public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { + Path p = getRootDir(c); + return p.getFileSystem(c); + } + + /** + * @param c configuration + * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from + * configuration as a qualified Path. Defaults to HBase root dir. + * @throws IOException e + */ + public static Path getWALRootDir(final Configuration c) throws IOException { + Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); + if (!isValidWALRootDir(p, c)) { + return getRootDir(c); + } + FileSystem fs = p.getFileSystem(c); + return p.makeQualified(fs); + } + + @VisibleForTesting + public static void setWALRootDir(final Configuration c, final Path root) throws IOException { + c.set(HBASE_WAL_DIR, root.toString()); + } + + public static FileSystem getWALFileSystem(final Configuration c) throws IOException { + Path p = getWALRootDir(c); + return p.getFileSystem(c); + } + + private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { + Path rootDir = getRootDir(c); + if (walDir != rootDir) { + if (walDir.toString().startsWith(rootDir.toString() + "/")) { + throw new IllegalStateException("Illegal WAL directory specified. " + + "WAL directories are not permitted to be under the root directory if set."); + } + } + return true; + } + + /** + * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under + * path rootdir + * + * @param rootdir qualified path of HBase root directory + * @param tableName name of table + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static Path getTableDir(Path rootdir, final TableName tableName) { + return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), + tableName.getQualifierAsString()); + } + + /** + * Returns the {@link org.apache.hadoop.hbase.TableName} object representing + * the table directory under + * path rootdir + * + * @param tablePath path of table + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static TableName getTableName(Path tablePath) { + return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); + } + + /** + * Returns the {@link org.apache.hadoop.fs.Path} object representing + * the namespace directory under path rootdir + * + * @param rootdir qualified path of HBase root directory + * @param namespace namespace name + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static Path getNamespaceDir(Path rootdir, final String namespace) { + return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, + new Path(namespace))); + } + + /** + * Sets storage policy for given path according to config setting. + * If the passed path is a directory, we'll set the storage policy for all files + * created in the future in said directory. Note that this change in storage + * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. + * If we're running on a FileSystem implementation that doesn't support the given storage policy + * (or storage policies at all), then we'll issue a log message and continue. + * + * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + * + * @param fs We only do anything it implements a setStoragePolicy method + * @param conf used to look up storage policy with given key; not modified. + * @param path the Path whose storage policy is to be set + * @param policyKey Key to use pulling a policy from Configuration: + * e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy). + * @param defaultPolicy if the configured policy is equal to this policy name, we will skip + * telling the FileSystem to set a storage policy. + */ + public static void setStoragePolicy(final FileSystem fs, final Configuration conf, + final Path path, final String policyKey, final String defaultPolicy) { + String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT); + if (storagePolicy.equals(defaultPolicy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("default policy of " + defaultPolicy + " requested, exiting early."); + } + return; + } + setStoragePolicy(fs, path, storagePolicy); + } + + // this mapping means that under a federated FileSystem implementation, we'll + // only log the first failure from any of the underlying FileSystems at WARN and all others + // will be at DEBUG. + private static final Map warningMap = + new ConcurrentHashMap(); + + /** + * Sets storage policy for given path. + * If the passed path is a directory, we'll set the storage policy for all files + * created in the future in said directory. Note that this change in storage + * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. + * If we're running on a version of FileSystem that doesn't support the given storage policy + * (or storage policies at all), then we'll issue a log message and continue. + * + * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + * + * @param fs We only do anything it implements a setStoragePolicy method + * @param path the Path whose storage policy is to be set + * @param storagePolicy Policy to set on path; see hadoop 2.6+ + * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g + * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. + */ + public static void setStoragePolicy(final FileSystem fs, final Path path, + final String storagePolicy) { + if (storagePolicy == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("We were passed a null storagePolicy, exiting early."); + } + return; + } + final String trimmedStoragePolicy = storagePolicy.trim(); + if (trimmedStoragePolicy.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("We were passed an empty storagePolicy, exiting early."); + } + return; + } + invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); + } + + /* + * All args have been checked and are good. Run the setStoragePolicy invocation. + */ + private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, + final String storagePolicy) { + Method m = null; + try { + m = fs.getClass().getDeclaredMethod("setStoragePolicy", + new Class[] { Path.class, String.class }); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " + + "not available. This is normal and expected on earlier Hadoop versions."; + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn(msg, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + m = null; + } catch (SecurityException e) { + final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " + + "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " + + "to the user@hbase mailing list. Please be sure to include a link to your configs, and " + + "logs that include this message and period of time before it. Logs around service " + + "start up will probably be useful as well."; + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn(msg, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + m = null; // could happen on setAccessible() or getDeclaredMethod() + } + if (m != null) { + try { + m.invoke(fs, path, storagePolicy); + if (LOG.isDebugEnabled()) { + LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path); + } + } catch (Exception e) { + // This swallows FNFE, should we be throwing it? seems more likely to indicate dev + // misuse than a runtime problem with HDFS. + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " + + "DEBUG log level might have more details.", e); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); + } + // check for lack of HDFS-7228 + if (e instanceof InvocationTargetException) { + final Throwable exception = e.getCause(); + if (exception instanceof RemoteException && + HadoopIllegalArgumentException.class.getName().equals( + ((RemoteException)exception).getClassName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + + "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + + "trying to use SSD related policies then you're likely missing HDFS-7228. For " + + "more information see the 'ArchivalStorage' docs for your Hadoop release."); + } + // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation + // that throws UnsupportedOperationException + } else if (exception instanceof UnsupportedOperationException) { + if (LOG.isDebugEnabled()) { + LOG.debug("The underlying FileSystem implementation doesn't support " + + "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " + + "appears to be present in your version of Hadoop. For more information check " + + "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " + + "specification docs from HADOOP-11981, and/or related documentation from the " + + "provider of the underlying FileSystem (its name should appear in the " + + "stacktrace that accompanies this message). Note in particular that Hadoop's " + + "local filesystem implementation doesn't support storage policies.", exception); + } + } + } + } + } + } + + /** + * @param conf must not be null + * @return True if this filesystem whose scheme is 'hdfs'. + * @throws IOException from underlying FileSystem + */ + public static boolean isHDFS(final Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + String scheme = fs.getUri().getScheme(); + return scheme.equalsIgnoreCase("hdfs"); + } + + /** + * Checks if the given path is the one with 'recovered.edits' dir. + * @param path must not be null + * @return True if we recovered edits + */ + public static boolean isRecoveredEdits(Path path) { + return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); + } + + /** + * @param conf must not be null + * @return Returns the filesystem of the hbase rootdir. + * @throws IOException from underlying FileSystem + */ + public static FileSystem getCurrentFileSystem(Configuration conf) + throws IOException { + return getRootDir(conf).getFileSystem(conf); + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This accommodates differences between hadoop versions, where hadoop 1 + * does not throw a FileNotFoundException, and return an empty FileStatus[] + * while Hadoop 2 will throw FileNotFoundException. + * + * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem, + * Path, FileStatusFilter) instead. + * + * @param fs file system + * @param dir directory + * @param filter path filter + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus [] listStatus(final FileSystem fs, + final Path dir, final PathFilter filter) throws IOException { + FileStatus [] status = null; + try { + status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + if (status == null || status.length < 1) { + return null; + } + return status; + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This would accommodates differences between hadoop versions + * + * @param fs file system + * @param dir directory + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { + return listStatus(fs, dir, null); + } + + /** + * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call + * + * @param fs file system + * @param dir directory + * @return LocatedFileStatus list + */ + public static List listLocatedStatus(final FileSystem fs, + final Path dir) throws IOException { + List status = null; + try { + RemoteIterator locatedFileStatusRemoteIterator = fs + .listFiles(dir, false); + while (locatedFileStatusRemoteIterator.hasNext()) { + if (status == null) { + status = Lists.newArrayList(); + } + status.add(locatedFileStatusRemoteIterator.next()); + } + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + return status; + } + + /** + * Calls fs.delete() and returns the value returned by the fs.delete() + * + * @param fs must not be null + * @param path must not be null + * @param recursive delete tree rooted at path + * @return the value returned by the fs.delete() + * @throws IOException from underlying FileSystem + */ + public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) + throws IOException { + return fs.delete(path, recursive); + } + + /** + * Calls fs.exists(). Checks if the specified path exists + * + * @param fs must not be null + * @param path must not be null + * @return the value returned by fs.exists() + * @throws IOException from underlying FileSystem + */ + public static boolean isExists(final FileSystem fs, final Path path) throws IOException { + return fs.exists(path); + } + + /** + * Log the current state of the filesystem from a certain root directory + * @param fs filesystem to investigate + * @param root root file/directory to start logging from + * @param LOG log to output information + * @throws IOException if an unexpected exception occurs + */ + public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG) + throws IOException { + LOG.debug("Current file system:"); + logFSTree(LOG, fs, root, "|-"); + } + + /** + * Recursive helper to log the state of the FS + * + * @see #logFileSystemState(FileSystem, Path, Log) + */ + private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix) + throws IOException { + FileStatus[] files = listStatus(fs, root, null); + if (files == null) { + return; + } + + for (FileStatus file : files) { + if (file.isDirectory()) { + LOG.debug(prefix + file.getPath().getName() + "/"); + logFSTree(LOG, fs, file.getPath(), prefix + "---"); + } else { + LOG.debug(prefix + file.getPath().getName()); + } + } + } + + public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) + throws IOException { + // set the modify time for TimeToLive Cleaner + fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); + return fs.rename(src, dest); + } + + /** + * Do our short circuit read setup. + * Checks buffer size to use and whether to do checksumming in hbase or hdfs. + * @param conf must not be null + */ + public static void setupShortCircuitRead(final Configuration conf) { + // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. + boolean shortCircuitSkipChecksum = + conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); + boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); + if (shortCircuitSkipChecksum) { + LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + + "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + + "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); + assert !shortCircuitSkipChecksum; //this will fail if assertions are on + } + checkShortCircuitReadBufferSize(conf); + } + + /** + * Check if short circuit read buffer size is set and if not, set it to hbase value. + * @param conf must not be null + */ + public static void checkShortCircuitReadBufferSize(final Configuration conf) { + final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; + final int notSet = -1; + // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 + final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; + int size = conf.getInt(dfsKey, notSet); + // If a size is set, return -- we will use it. + if (size != notSet) { + return; + } + // But short circuit buffer size is normally not set. Put in place the hbase wanted size. + int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); + conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); + } + + // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and + // not until we attempt to reference it. + private static class StreamCapabilities { + public static final boolean PRESENT; + public static final Class CLASS; + public static final Method METHOD; + static { + boolean tmp = false; + Class clazz = null; + Method method = null; + try { + clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities"); + method = clazz.getMethod("hasCapability", String.class); + tmp = true; + } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) { + LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " + + "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " + + "support hflush/hsync. If you are running on top of HDFS this probably just " + + "means you have an older version and this can be ignored. If you are running on " + + "top of an alternate FileSystem implementation you should manually verify that " + + "hflush and hsync are implemented; otherwise you risk data loss and hard to " + + "diagnose errors when our assumptions are violated."); + LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.", + exception); + } finally { + PRESENT = tmp; + CLASS = clazz; + METHOD = method; + } + } + } + + /** + * If our FileSystem version includes the StreamCapabilities class, check if + * the given stream has a particular capability. + * @param stream capabilities are per-stream instance, so check this one specifically. must not be + * null + * @param capability what to look for, per Hadoop Common's FileSystem docs + * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't + * implement it. return result of asking the stream otherwise. + */ + public static boolean hasCapability(FSDataOutputStream stream, String capability) { + // be consistent whether or not StreamCapabilities is present + if (stream == null) { + throw new NullPointerException("stream parameter must not be null."); + } + // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything + // otherwise old versions of Hadoop will break. + boolean result = true; + if (StreamCapabilities.PRESENT) { + // if StreamCapabilities is present, but the stream doesn't implement it + // or we run into a problem invoking the method, + // we treat that as equivalent to not declaring anything + result = false; + if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) { + try { + result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue(); + } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException + exception) { + LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " + + "our understanding of how it's supposed to work. Please file a JIRA and include " + + "the following stack trace. In the mean time we're interpreting this behavior " + + "difference as a lack of capability support, which will probably cause a failure.", + exception); + } + } + } + return result; + } + + /** + * Helper exception for those cases where the place where we need to check a stream capability + * is not where we have the needed context to explain the impact and mitigation for a lack. + */ + public static class StreamLacksCapabilityException extends Exception { + public StreamLacksCapabilityException(String message, Throwable cause) { + super(message, cause); + } + public StreamLacksCapabilityException(String message) { + super(message); + } + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java new file mode 100644 index 00000000000..7ff579277ce --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java @@ -0,0 +1,164 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test {@link CommonFSUtils}. + */ +@Category({MiscTests.class, MediumTests.class}) +public class TestCommonFSUtils { + private static final Log LOG = LogFactory.getLog(TestCommonFSUtils.class); + + private HBaseCommonTestingUtility htu; + private Configuration conf; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + conf = htu.getConfiguration(); + } + + /** + * Test path compare and prefix checking. + */ + @Test + public void testMatchingTail() throws IOException { + Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); + assertTrue(rootdir.depth() > 1); + Path partPath = new Path("a", "b"); + Path fullPath = new Path(rootdir, partPath); + Path fullyQualifiedPath = fs.makeQualified(fullPath); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath)); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath.toString())); + assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullPath.toString())); + assertTrue(CommonFSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString())); + assertFalse(CommonFSUtils.isStartingWithPath(rootdir, partPath.toString())); + assertFalse(CommonFSUtils.isMatchingTail(fullyQualifiedPath, partPath)); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath)); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString())); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath))); + assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString())); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, new Path("x"))); + assertFalse(CommonFSUtils.isMatchingTail(new Path("x"), fullPath)); + } + + private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) + throws Exception { + FSDataOutputStream out = fs.create(file); + byte [] data = new byte[dataSize]; + out.write(data, 0, dataSize); + out.close(); + } + + @Test + public void testSetWALRootDir() throws Exception { + Path p = new Path("file:///hbase/root"); + CommonFSUtils.setWALRootDir(conf, p); + assertEquals(p.toString(), conf.get(CommonFSUtils.HBASE_WAL_DIR)); + } + + @Test + public void testGetWALRootDir() throws IOException { + Path root = new Path("file:///hbase/root"); + Path walRoot = new Path("file:///hbase/logroot"); + CommonFSUtils.setRootDir(conf, root); + assertEquals(CommonFSUtils.getRootDir(conf), root); + assertEquals(CommonFSUtils.getWALRootDir(conf), root); + CommonFSUtils.setWALRootDir(conf, walRoot); + assertEquals(CommonFSUtils.getWALRootDir(conf), walRoot); + } + + @Test(expected=IllegalStateException.class) + public void testGetWALRootDirIllegalWALDir() throws IOException { + Path root = new Path("file:///hbase/root"); + Path invalidWALDir = new Path("file:///hbase/root/logroot"); + CommonFSUtils.setRootDir(conf, root); + CommonFSUtils.setWALRootDir(conf, invalidWALDir); + CommonFSUtils.getWALRootDir(conf); + } + + @Test + public void testRemoveWALRootPath() throws Exception { + CommonFSUtils.setRootDir(conf, new Path("file:///user/hbase")); + Path testFile = new Path(CommonFSUtils.getRootDir(conf), "test/testfile"); + Path tmpFile = new Path("file:///test/testfile"); + assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), "test/testfile"); + assertEquals(CommonFSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString()); + CommonFSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir")); + assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), testFile.toString()); + Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog"); + assertEquals(CommonFSUtils.removeWALRootPath(logFile, conf), "test/testlog"); + } + + @Test(expected=NullPointerException.class) + public void streamCapabilitiesDoesNotAllowNullStream() { + CommonFSUtils.hasCapability(null, "hopefully any string"); + } + + private static final boolean STREAM_CAPABILITIES_IS_PRESENT; + static { + boolean tmp = false; + try { + Class.forName("org.apache.hadoop.fs.StreamCapabilities"); + tmp = true; + LOG.debug("Test thought StreamCapabilities class was present."); + } catch (ClassNotFoundException exception) { + LOG.debug("Test didn't think StreamCapabilities class was present."); + } finally { + STREAM_CAPABILITIES_IS_PRESENT = tmp; + } + } + + @Test + public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException { + FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "hsync")); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "hflush")); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " + + "implement.")); + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 974bc13a6af..f49833c77b9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -46,18 +46,20 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** * WAL implementation of the ProcedureStore. @@ -67,6 +69,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe public class WALProcedureStore extends ProcedureStoreBase { private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); public static final String LOG_PREFIX = "pv2-"; + /** Used to construct the name of the log directory for master procedures */ + public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; + public interface LeaseRecovery { void recoverFileLease(FileSystem fs, Path path) throws IOException; @@ -185,18 +190,42 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, - final LeaseRecovery leaseRecovery) { - this(conf, fs, walDir, null, leaseRecovery); + public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery) + throws IOException { + this(conf, + new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR), + new Path(CommonFSUtils.getRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), leaseRecovery); } - public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, - final Path walArchiveDir, final LeaseRecovery leaseRecovery) { - this.fs = fs; + @VisibleForTesting + public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir, + final LeaseRecovery leaseRecovery) throws IOException { this.conf = conf; + this.leaseRecovery = leaseRecovery; this.walDir = walDir; this.walArchiveDir = walArchiveDir; - this.leaseRecovery = leaseRecovery; + this.fs = walDir.getFileSystem(conf); + + // Create the log directory for the procedure store + if (!fs.exists(walDir)) { + if (!fs.mkdirs(walDir)) { + throw new IOException("Unable to mkdir " + walDir); + } + } + // Now that it exists, set the log policy + CommonFSUtils.setStoragePolicy(fs, conf, walDir, HConstants.WAL_STORAGE_POLICY, + HConstants.DEFAULT_WAL_STORAGE_POLICY); + + // Create archive dir up front. Rename won't work w/o it up on HDFS. + if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { + if (this.fs.mkdirs(this.walArchiveDir)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir); + } + } else { + LOG.warn("Failed create of " + this.walArchiveDir); + } + } } @Override @@ -247,16 +276,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } }; syncThread.start(); - - // Create archive dir up front. Rename won't work w/o it up on HDFS. - if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { - if (this.fs.mkdirs(this.walArchiveDir)) { - if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " + - this.walArchiveDir); - } else { - LOG.warn("Failed create of " + this.walArchiveDir); - } - } } @Override @@ -1005,6 +1024,17 @@ public class WALProcedureStore extends ProcedureStoreBase { LOG.warn("failed to create log file with id=" + logId, re); return false; } + // After we create the stream but before we attempt to use it at all + // ensure that we can provide the level of data safety we're configured + // to provide. + final String durability = useHsync ? "hsync" : "hflush"; + if (!(CommonFSUtils.hasCapability(newStream, durability))) { + throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + + " for proper operation during component failures, but the underlying filesystem does " + + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + + "' to set the desired level of robustness and ensure the config value of '" + + CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); + } try { ProcedureWALFormat.writeHeader(newStream, header); startPos = newStream.getPos(); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 99d3c282cd8..6e0c02eb2aa 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -51,14 +51,14 @@ public class ProcedureTestingUtility { private ProcedureTestingUtility() { } - public static ProcedureStore createStore(final Configuration conf, final FileSystem fs, - final Path baseDir) throws IOException { - return createWalStore(conf, fs, baseDir); + public static ProcedureStore createStore(final Configuration conf, final Path dir) + throws IOException { + return createWalStore(conf, dir); } - public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs, - final Path walDir) throws IOException { - return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() { + public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) + throws IOException { + return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() { @Override public void recoverFileLease(FileSystem fs, Path path) throws IOException { // no-op diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java index 1a4dd866a4a..4c1611a6c50 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java @@ -60,7 +60,7 @@ public class TestChildProcedures { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java index ce9795f632f..bd310fd9292 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -61,7 +61,7 @@ public class TestProcedureEvents { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procStore.start(1); procExecutor.start(1, true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java index 1a3f8981201..ed6d512df70 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java @@ -63,7 +63,7 @@ public class TestProcedureExecution { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java index 0a57efaa3e0..6246629ef90 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java @@ -66,7 +66,7 @@ public class TestProcedureMetrics { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java index ec2e54e421f..12a8012ef8a 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java @@ -67,7 +67,7 @@ public class TestProcedureNonce { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index b0f6cbc6137..06f8833a583 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -69,7 +69,7 @@ public class TestProcedureRecovery { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index 23ca6ba553c..12b21847db7 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -68,7 +68,7 @@ public class TestProcedureReplayOrder { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcedureEnv(); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procStore.start(NUM_THREADS); procExecutor.start(1, true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java index 8347dbf90b6..cbe50f2c2d3 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java @@ -76,7 +76,7 @@ public class TestStateMachineProcedure { fs = testDir.getFileSystem(htu.getConfiguration()); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index 488216815c5..017992cfea6 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -65,7 +65,7 @@ public class TestYieldProcedures { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procRunnables = new TestScheduler(); procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java index 5554a6c5c1c..503850d3aa9 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java @@ -126,7 +126,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool { Path logDir = new Path(testDir, "proc-logs"); System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); fs.delete(logDir, true); - store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + store = ProcedureTestingUtility.createWalStore(conf, logDir); store.start(1); store.recoverLease(); store.load(new LoadCounter()); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java index 823972fd8d5..1a7fc800c09 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java @@ -93,9 +93,9 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { System.out.println("Logs directory : " + logDir.toString()); fs.delete(logDir, true); if ("nosync".equals(syncType)) { - store = new NoSyncWalProcedureStore(conf, fs, logDir); + store = new NoSyncWalProcedureStore(conf, logDir); } else { - store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + store = ProcedureTestingUtility.createWalStore(conf, logDir); } store.start(numThreads); store.recoverLease(); @@ -244,9 +244,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { } private class NoSyncWalProcedureStore extends WALProcedureStore { - public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, - final Path logDir) { - super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { + public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException { + super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() { @Override public void recoverFileLease(FileSystem fs, Path path) throws IOException { // no-op diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java index 610688f12bc..98ec1146e79 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java @@ -75,7 +75,7 @@ public class TestStressWALProcedureStore { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore.start(PROCEDURE_STORE_SLOTS); procStore.recoverLease(); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 44c8e127454..98b1b7c9d6a 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -86,7 +86,7 @@ public class TestWALProcedureStore { setupConfig(htu.getConfiguration()); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore.start(PROCEDURE_STORE_SLOTS); procStore.recoverLease(); procStore.load(new LoadCounter()); @@ -729,7 +729,7 @@ public class TestWALProcedureStore { assertEquals(procs.length + 1, status.length); // simulate another active master removing the wals - procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir, + procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, new WALProcedureStore.LeaseRecovery() { private int count = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java index bb34af69255..f7eb02b4d19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java @@ -65,9 +65,6 @@ import edu.umd.cs.findbugs.annotations.Nullable; public class HFileSystem extends FilterFileSystem { public static final Log LOG = LogFactory.getLog(HFileSystem.class); - /** Parameter name for HBase WAL directory */ - public static final String HBASE_WAL_DIR = "hbase.wal.dir"; - private final FileSystem noChecksumFs; // read hfile data from storage private final boolean useHBaseChecksum; private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 04bf01fca45..1f5462f921f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -56,7 +57,8 @@ public final class AsyncFSOutputHelper { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoop eventLoop, - Class channelClass) throws IOException { + Class channelClass) + throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, overwrite, createParent, replication, blockSize, eventLoop, channelClass); @@ -69,6 +71,13 @@ public final class AsyncFSOutputHelper { } else { fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); } + // After we create the stream but before we attempt to use it at all + // ensure that we can provide the level of data safety we're configured + // to provide. + if (!(CommonFSUtils.hasCapability(fsOut, "hflush") && + CommonFSUtils.hasCapability(fsOut, "hsync"))) { + throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync"); + } final ExecutorService flushExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3ba31dab0f9..7a778f2affd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -53,7 +53,6 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus.Option; @@ -1201,23 +1200,8 @@ public class HMaster extends HRegionServer implements MasterServices { private void startProcedureExecutor() throws IOException { final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); final Path rootDir = FSUtils.getRootDir(conf); - final Path walDir = new Path(FSUtils.getWALRootDir(this.conf), - MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); - final Path walArchiveDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - final FileSystem walFs = walDir.getFileSystem(conf); - - // Create the log directory for the procedure store - if (!walFs.exists(walDir)) { - if (!walFs.mkdirs(walDir)) { - throw new IOException("Unable to mkdir " + walDir); - } - } - // Now that it exists, set the log policy - FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY, - HConstants.DEFAULT_WAL_STORAGE_POLICY); - - procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, walArchiveDir, + procedureStore = new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 3b268cb4f2e..27987f6bce3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; @@ -144,10 +144,10 @@ public class MasterFileSystem { }; final String[] protectedSubLogDirs = new String[] { - HConstants.HREGION_LOGDIR_NAME, - HConstants.HREGION_OLDLOGDIR_NAME, - HConstants.CORRUPT_DIR_NAME, - MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR + HConstants.HREGION_LOGDIR_NAME, + HConstants.HREGION_OLDLOGDIR_NAME, + HConstants.CORRUPT_DIR_NAME, + WALProcedureStore.MASTER_PROCEDURE_LOGDIR }; // check if the root directory exists checkRootDir(this.rootdir, conf, this.fs); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java index 16647d27f29..495fab6d977 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java @@ -24,9 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience; public final class MasterProcedureConstants { private MasterProcedureConstants() {} - /** Used to construct the name of the log directory for master procedures */ - public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; - /** Number of threads used by the procedure executor */ public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 61c71005986..ad54cabec55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; @@ -356,7 +356,7 @@ public abstract class AbstractFSWAL implements WAL { } // Now that it exists, set the storage policy for the entire directory of wal files related to // this FSHLog instance - FSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY, + CommonFSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); @@ -381,7 +381,7 @@ public abstract class AbstractFSWAL implements WAL { }; if (failIfWALExists) { - final FileStatus[] walFiles = FSUtils.listStatus(fs, walDir, ourFiles); + final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); if (null != walFiles && 0 != walFiles.length) { throw new IOException("Target WAL already exists within directory " + walDir); } @@ -398,7 +398,7 @@ public abstract class AbstractFSWAL implements WAL { // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks // (it costs a little x'ing bocks) final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", - FSUtils.getDefaultBlockSize(this.fs, this.walDir)); + CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir)); this.logrollsize = (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); @@ -652,7 +652,7 @@ public abstract class AbstractFSWAL implements WAL { } } LOG.info("Archiving " + p + " to " + newPath); - if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { + if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { throw new IOException("Unable to rename " + p + " to " + newPath); } // Tell our listeners that a log has been archived. @@ -685,12 +685,12 @@ public abstract class AbstractFSWAL implements WAL { try { long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter); int oldNumEntries = this.numEntries.getAndSet(0); - final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); + final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath)); if (oldPath != null) { this.walFile2Props.put(oldPath, new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); this.totalLogSize.addAndGet(oldFileLen); - LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString); } else { LOG.info("New WAL " + newPathString); @@ -767,6 +767,11 @@ public abstract class AbstractFSWAL implements WAL { cleanOldLogs(); regionsToFlush = findRegionsToForceFlush(); } + } catch (CommonFSUtils.StreamLacksCapabilityException exception) { + // If the underlying FileSystem can't do what we ask, treat as IO failure so + // we'll abort. + throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " + + "for details.", exception); } finally { closeBarrier.endOp(); assert scope == NullScope.INSTANCE || !scope.isDetached(); @@ -794,7 +799,7 @@ public abstract class AbstractFSWAL implements WAL { * @return may be null if there are no files. */ protected FileStatus[] getFiles() throws IOException { - return FSUtils.listStatus(fs, walDir, ourFiles); + return CommonFSUtils.listStatus(fs, walDir, ourFiles); } @Override @@ -833,7 +838,7 @@ public abstract class AbstractFSWAL implements WAL { } } - if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { + if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { throw new IOException("Unable to rename " + file.getPath() + " to " + p); } // Tell our listeners that a log was archived. @@ -843,7 +848,8 @@ public abstract class AbstractFSWAL implements WAL { } } } - LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.walArchiveDir)); + LOG.debug("Moved " + files.length + " WAL file(s) to " + + CommonFSUtils.getPath(this.walArchiveDir)); } LOG.info("Closed WAL: " + toString()); } @@ -1022,7 +1028,8 @@ public abstract class AbstractFSWAL implements WAL { protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; - protected abstract W createWriterInstance(Path path) throws IOException; + protected abstract W createWriterInstance(Path path) throws IOException, + CommonFSUtils.StreamLacksCapabilityException; /** * @return old wal file size diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 3747f4720a2..256ced64bdf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.FSUtils; @@ -153,7 +154,7 @@ public abstract class AbstractProtobufLogWriter { } public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) - throws IOException { + throws IOException, StreamLacksCapabilityException { this.conf = conf; boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); @@ -237,7 +238,7 @@ public abstract class AbstractProtobufLogWriter { } protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException; + short replication, long blockSize) throws IOException, StreamLacksCapabilityException; /** * return the file length after written. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index d17dde2f2b2..f3c5bf2617e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -153,9 +154,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException { + short replication, long blockSize) throws IOException, StreamLacksCapabilityException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoop, channelClass); + blockSize, eventLoop, channelClass); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 59f6713d7fb..d1e72f764c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.Cell; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -86,9 +88,13 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @SuppressWarnings("deprecation") @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException { + short replication, long blockSize) throws IOException, StreamLacksCapabilityException { this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); + // TODO Be sure to add a check for hsync if this branch includes HBASE-19024 + if (!(CommonFSUtils.hasCapability(output, "hflush"))) { + throw new StreamLacksCapabilityException("hflush"); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 5748e6d5297..81fcaf201f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.util; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.primitives.Ints; import edu.umd.cs.findbugs.annotations.CheckForNull; @@ -36,8 +35,6 @@ import java.io.InterruptedIOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -60,17 +57,14 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.ClusterId; @@ -101,174 +95,24 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; - -import static org.apache.hadoop.hbase.HConstants.HBASE_DIR; - /** * Utility methods for interacting with the underlying file system. */ @InterfaceAudience.Private -public abstract class FSUtils { +public abstract class FSUtils extends CommonFSUtils { private static final Log LOG = LogFactory.getLog(FSUtils.class); - /** Full access permissions (starting point for a umask) */ - public static final String FULL_RWX_PERMISSIONS = "777"; private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; private static final int DEFAULT_THREAD_POOLSIZE = 2; /** Set to true on Windows platforms */ + @VisibleForTesting // currently only used in testing. TODO refactor into a test class public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); protected FSUtils() { super(); } - /** - * Sets storage policy for given path according to config setting. - * If the passed path is a directory, we'll set the storage policy for all files - * created in the future in said directory. Note that this change in storage - * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle. - * If we're running on a version of HDFS that doesn't support the given storage policy - * (or storage policies at all), then we'll issue a log message and continue. - * - * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html - * - * @param fs We only do anything if an instance of DistributedFileSystem - * @param conf used to look up storage policy with given key; not modified. - * @param path the Path whose storage policy is to be set - * @param policyKey Key to use pulling a policy from Configuration: - * e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy). - * @param defaultPolicy usually should be the policy NONE to delegate to HDFS - */ - public static void setStoragePolicy(final FileSystem fs, final Configuration conf, - final Path path, final String policyKey, final String defaultPolicy) { - String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT); - if (storagePolicy.equals(defaultPolicy)) { - if (LOG.isTraceEnabled()) { - LOG.trace("default policy of " + defaultPolicy + " requested, exiting early."); - } - return; - } - setStoragePolicy(fs, path, storagePolicy); - } - - private static final Map warningMap = - new ConcurrentHashMap(); - - /** - * Sets storage policy for given path. - * If the passed path is a directory, we'll set the storage policy for all files - * created in the future in said directory. Note that this change in storage - * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle. - * If we're running on a version of HDFS that doesn't support the given storage policy - * (or storage policies at all), then we'll issue a log message and continue. - * - * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html - * - * @param fs We only do anything if an instance of DistributedFileSystem - * @param path the Path whose storage policy is to be set - * @param storagePolicy Policy to set on path; see hadoop 2.6+ - * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g - * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. - */ - public static void setStoragePolicy(final FileSystem fs, final Path path, - final String storagePolicy) { - if (storagePolicy == null) { - if (LOG.isTraceEnabled()) { - LOG.trace("We were passed a null storagePolicy, exiting early."); - } - return; - } - final String trimmedStoragePolicy = storagePolicy.trim(); - if (trimmedStoragePolicy.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("We were passed an empty storagePolicy, exiting early."); - } - return; - } - boolean distributed = false; - try { - distributed = isDistributedFileSystem(fs); - } catch (IOException ioe) { - if (!warningMap.containsKey(fs)) { - warningMap.put(fs, true); - LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " - + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy - + " on path=" + path); - } else if (LOG.isDebugEnabled()) { - LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " - + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy - + " on path=" + path); - } - return; - } - if (distributed) { - invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); - } - } - - /* - * All args have been checked and are good. Run the setStoragePolicy invocation. - */ - private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, - final String storagePolicy) { - Method m = null; - try { - m = fs.getClass().getDeclaredMethod("setStoragePolicy", - new Class[] { Path.class, String.class }); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available"; - if (!warningMap.containsKey(fs)) { - warningMap.put(fs, true); - LOG.warn(msg, e); - } else if (LOG.isDebugEnabled()) { - LOG.debug(msg, e); - } - m = null; - } catch (SecurityException e) { - final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available"; - if (!warningMap.containsKey(fs)) { - warningMap.put(fs, true); - LOG.warn(msg, e); - } else if (LOG.isDebugEnabled()) { - LOG.debug(msg, e); - } - m = null; // could happen on setAccessible() - } - if (m != null) { - try { - m.invoke(fs, path, storagePolicy); - if (LOG.isDebugEnabled()) { - LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path); - } - } catch (Exception e) { - // This swallows FNFE, should we be throwing it? seems more likely to indicate dev - // misuse than a runtime problem with HDFS. - if (!warningMap.containsKey(fs)) { - warningMap.put(fs, true); - LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); - } else if (LOG.isDebugEnabled()) { - LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); - } - // check for lack of HDFS-7228 - if (e instanceof InvocationTargetException) { - final Throwable exception = e.getCause(); - if (exception instanceof RemoteException && - HadoopIllegalArgumentException.class.getName().equals( - ((RemoteException)exception).getClassName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + - "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + - "trying to use SSD related policies then you're likely missing HDFS-7228. For " + - "more information see the 'ArchivalStorage' docs for your Hadoop release."); - } - } - } - } - } - } - /** * @return True is fs is instance of DistributedFileSystem * @throws IOException @@ -283,32 +127,6 @@ public abstract class FSUtils { return fileSystem instanceof DistributedFileSystem; } - /** - * Compare of path component. Does not consider schema; i.e. if schemas - * different but path starts with rootPath, - * then the function returns true - * @param rootPath - * @param path - * @return True if path starts with rootPath - */ - public static boolean isStartingWithPath(final Path rootPath, final String path) { - String uriRootPath = rootPath.toUri().getPath(); - String tailUriPath = (new Path(path)).toUri().getPath(); - return tailUriPath.startsWith(uriRootPath); - } - - /** - * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the - * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, - * the two will equate. - * @param pathToSearch Path we will be trying to match. - * @param pathTail - * @return True if pathTail is tail on the path of pathToSearch - */ - public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { - return isMatchingTail(pathToSearch, new Path(pathTail)); - } - /** * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider @@ -352,18 +170,6 @@ public abstract class FSUtils { return fsUtils; } - /** - * Delete if exists. - * @param fs filesystem object - * @param dir directory to delete - * @return True if deleted dir - * @throws IOException e - */ - public static boolean deleteDirectory(final FileSystem fs, final Path dir) - throws IOException { - return fs.exists(dir) && fs.delete(dir, true); - } - /** * Delete the region directory if exists. * @param conf @@ -379,89 +185,7 @@ public abstract class FSUtils { new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); } - /** - * Return the number of bytes that large input files should be optimally - * be split into to minimize i/o time. - * - * use reflection to search for getDefaultBlockSize(Path f) - * if the method doesn't exist, fall back to using getDefaultBlockSize() - * - * @param fs filesystem object - * @return the default block size for the path's filesystem - * @throws IOException e - */ - public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException { - Method m = null; - Class cls = fs.getClass(); - try { - m = cls.getMethod("getDefaultBlockSize", new Class[] { Path.class }); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem doesn't support getDefaultBlockSize"); - } catch (SecurityException e) { - LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e); - m = null; // could happen on setAccessible() - } - if (m == null) { - return fs.getDefaultBlockSize(path); - } else { - try { - Object ret = m.invoke(fs, path); - return ((Long)ret).longValue(); - } catch (Exception e) { - throw new IOException(e); - } - } - } - - /* - * Get the default replication. - * - * use reflection to search for getDefaultReplication(Path f) - * if the method doesn't exist, fall back to using getDefaultReplication() - * - * @param fs filesystem object - * @param f path of file - * @return default replication for the path's filesystem - * @throws IOException e - */ - public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException { - Method m = null; - Class cls = fs.getClass(); - try { - m = cls.getMethod("getDefaultReplication", new Class[] { Path.class }); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem doesn't support getDefaultReplication"); - } catch (SecurityException e) { - LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e); - m = null; // could happen on setAccessible() - } - if (m == null) { - return fs.getDefaultReplication(path); - } else { - try { - Object ret = m.invoke(fs, path); - return ((Number)ret).shortValue(); - } catch (Exception e) { - throw new IOException(e); - } - } - } - - /** - * Returns the default buffer size to use during writes. - * - * The size of the buffer should probably be a multiple of hardware - * page size (4096 on Intel x86), and it determines how much data is - * buffered during read and write operations. - * - * @param fs filesystem object - * @return default buffer size to use during writes - */ - public static int getDefaultBufferSize(final FileSystem fs) { - return fs.getConf().getInt("io.file.buffer.size", 4096); - } - - /** + /** * Create the specified file on the filesystem. By default, this will: *
    *
  1. overwrite the file if it exists
  2. @@ -514,71 +238,6 @@ public abstract class FSUtils { return create(fs, path, perm, true); } - /** - * Create the specified file on the filesystem. By default, this will: - *
      - *
    1. apply the umask in the configuration (if it is enabled)
    2. - *
    3. use the fs configured buffer size (or 4096 if not set)
    4. - *
    5. use the default replication
    6. - *
    7. use the default block size
    8. - *
    9. not track progress
    10. - *
    - * - * @param fs {@link FileSystem} on which to write the file - * @param path {@link Path} to the file to write - * @param perm - * @param overwrite Whether or not the created file should be overwritten. - * @return output stream to the created file - * @throws IOException if the file cannot be created - */ - public static FSDataOutputStream create(FileSystem fs, Path path, - FsPermission perm, boolean overwrite) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite); - } - return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), - getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); - } - - /** - * Get the file permissions specified in the configuration, if they are - * enabled. - * - * @param fs filesystem that the file will be created on. - * @param conf configuration to read for determining if permissions are - * enabled and which to use - * @param permssionConfKey property key in the configuration to use when - * finding the permission - * @return the permission to use when creating a new file on the fs. If - * special permissions are not specified in the configuration, then - * the default permissions on the the fs will be returned. - */ - public static FsPermission getFilePermissions(final FileSystem fs, - final Configuration conf, final String permssionConfKey) { - boolean enablePermissions = conf.getBoolean( - HConstants.ENABLE_DATA_FILE_UMASK, false); - - if (enablePermissions) { - try { - FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); - // make sure that we have a mask, if not, go default. - String mask = conf.get(permssionConfKey); - if (mask == null) - return FsPermission.getFileDefault(); - // appy the umask - FsPermission umask = new FsPermission(mask); - return perm.applyUMask(umask); - } catch (IllegalArgumentException e) { - LOG.warn( - "Incorrect umask attempted to be created: " - + conf.get(permssionConfKey) - + ", using default file permissions.", e); - return FsPermission.getFileDefault(); - } - } - return FsPermission.getFileDefault(); - } - /** * Checks to see if the specified file system is available * @@ -1022,46 +681,6 @@ public abstract class FSUtils { } } - /** - * Verifies root directory path is a valid URI with a scheme - * - * @param root root directory path - * @return Passed root argument. - * @throws IOException if not a valid URI with a scheme - */ - public static Path validateRootPath(Path root) throws IOException { - try { - URI rootURI = new URI(root.toString()); - String scheme = rootURI.getScheme(); - if (scheme == null) { - throw new IOException("Root directory does not have a scheme"); - } - return root; - } catch (URISyntaxException e) { - IOException io = new IOException("Root directory path is not a valid " + - "URI -- check your " + HBASE_DIR + " configuration"); - io.initCause(e); - throw io; - } - } - - /** - * Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If - * it exists, this method removes it and returns the String representation of remaining relative path. - * @param path - * @param conf - * @return String representation of the remaining relative path - * @throws IOException - */ - public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { - Path root = getWALRootDir(conf); - String pathStr = path.toString(); - // check that the path is absolute... it has the root path in it. - if (!pathStr.startsWith(root.toString())) return pathStr; - // if not, return as it is. - return pathStr.substring(root.toString().length() + 1);// remove the "/" too. - } - /** * If DFS, check safe mode and if so, wait until we clear it. * @param conf configuration @@ -1085,81 +704,6 @@ public abstract class FSUtils { } } - /** - * Return the 'path' component of a Path. In Hadoop, Path is an URI. This - * method returns the 'path' component of a Path's URI: e.g. If a Path is - * hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir, - * this method returns /hbase_trunk/TestTable/compaction.dir. - * This method is useful if you want to print out a Path without qualifying - * Filesystem instance. - * @param p Filesystem Path whose 'path' component we are to return. - * @return Path portion of the Filesystem - */ - public static String getPath(Path p) { - return p.toUri().getPath(); - } - - /** - * @param c configuration - * @return {@link Path} to hbase root directory: i.e. {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} from - * configuration as a qualified Path. - * @throws IOException e - */ - public static Path getRootDir(final Configuration c) throws IOException { - Path p = new Path(c.get(HBASE_DIR)); - FileSystem fs = p.getFileSystem(c); - return p.makeQualified(fs); - } - - public static void setRootDir(final Configuration c, final Path root) throws IOException { - c.set(HBASE_DIR, root.toString()); - } - - public static void setFsDefault(final Configuration c, final Path root) throws IOException { - c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ - } - - public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { - Path p = getRootDir(c); - return p.getFileSystem(c); - } - - /** - * @param c configuration - * @return {@link Path} to hbase log root directory: i.e. {@value org.apache.hadoop.hbase.fs.HFileSystem#HBASE_WAL_DIR} from - * configuration as a qualified Path. Defaults to {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} - * @throws IOException e - */ - public static Path getWALRootDir(final Configuration c) throws IOException { - Path p = new Path(c.get(HFileSystem.HBASE_WAL_DIR, c.get(HBASE_DIR))); - if (!isValidWALRootDir(p, c)) { - return FSUtils.getRootDir(c); - } - FileSystem fs = p.getFileSystem(c); - return p.makeQualified(fs); - } - - @VisibleForTesting - public static void setWALRootDir(final Configuration c, final Path root) throws IOException { - c.set(HFileSystem.HBASE_WAL_DIR, root.toString()); - } - - public static FileSystem getWALFileSystem(final Configuration c) throws IOException { - Path p = getWALRootDir(c); - return p.getFileSystem(c); - } - - private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { - Path rootDir = FSUtils.getRootDir(c); - if (walDir != rootDir) { - if (walDir.toString().startsWith(rootDir.toString() + "/")) { - throw new IllegalStateException("Illegal WAL directory specified. " + - "WAL directories are not permitted to be under the root directory if set."); - } - } - return true; - } - /** * Checks if meta region exists * @@ -1297,44 +841,6 @@ public abstract class FSUtils { return frags; } - /** - * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under - * path rootdir - * - * @param rootdir qualified path of HBase root directory - * @param tableName name of table - * @return {@link org.apache.hadoop.fs.Path} for table - */ - public static Path getTableDir(Path rootdir, final TableName tableName) { - return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), - tableName.getQualifierAsString()); - } - - /** - * Returns the {@link org.apache.hadoop.hbase.TableName} object representing - * the table directory under - * path rootdir - * - * @param tablePath path of table - * @return {@link org.apache.hadoop.fs.Path} for table - */ - public static TableName getTableName(Path tablePath) { - return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); - } - - /** - * Returns the {@link org.apache.hadoop.fs.Path} object representing - * the namespace directory under path rootdir - * - * @param rootdir qualified path of HBase root directory - * @param namespace namespace name - * @return {@link org.apache.hadoop.fs.Path} for table - */ - public static Path getNamespaceDir(Path rootdir, final String namespace) { - return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, - new Path(namespace))); - } - /** * A {@link PathFilter} that returns only regular files. */ @@ -1431,17 +937,6 @@ public abstract class FSUtils { } } - /** - * @param conf - * @return True if this filesystem whose scheme is 'hdfs'. - * @throws IOException - */ - public static boolean isHDFS(final Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - String scheme = fs.getUri().getScheme(); - return scheme.equalsIgnoreCase("hdfs"); - } - /** * Recover file lease. Used when a file might be suspect * to be had been left open by another process. @@ -1483,15 +978,6 @@ public abstract class FSUtils { return tabledirs; } - /** - * Checks if the given path is the one with 'recovered.edits' dir. - * @param path - * @return True if we recovered edits - */ - public static boolean isRecoveredEdits(Path path) { - return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); - } - /** * Filter for all dirs that don't start with '.' */ @@ -1668,18 +1154,6 @@ public abstract class FSUtils { } } - - /** - * @param conf - * @return Returns the filesystem of the hbase rootdir. - * @throws IOException - */ - public static FileSystem getCurrentFileSystem(Configuration conf) - throws IOException { - return getRootDir(conf).getFileSystem(conf); - } - - /** * Runs through the HBase rootdir/tablename and creates a reverse lookup map for * table StoreFile names to the full Path. @@ -1977,101 +1451,6 @@ public abstract class FSUtils { } } - /** - * Calls fs.listStatus() and treats FileNotFoundException as non-fatal - * This accommodates differences between hadoop versions, where hadoop 1 - * does not throw a FileNotFoundException, and return an empty FileStatus[] - * while Hadoop 2 will throw FileNotFoundException. - * - * Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem, - * Path, FileStatusFilter)} instead. - * - * @param fs file system - * @param dir directory - * @param filter path filter - * @return null if dir is empty or doesn't exist, otherwise FileStatus array - */ - public static FileStatus [] listStatus(final FileSystem fs, - final Path dir, final PathFilter filter) throws IOException { - FileStatus [] status = null; - try { - status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); - } catch (FileNotFoundException fnfe) { - // if directory doesn't exist, return null - if (LOG.isTraceEnabled()) { - LOG.trace(dir + " doesn't exist"); - } - } - if (status == null || status.length < 1) return null; - return status; - } - - /** - * Calls fs.listStatus() and treats FileNotFoundException as non-fatal - * This would accommodates differences between hadoop versions - * - * @param fs file system - * @param dir directory - * @return null if dir is empty or doesn't exist, otherwise FileStatus array - */ - public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { - return listStatus(fs, dir, null); - } - - /** - * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call - * - * @param fs file system - * @param dir directory - * @return LocatedFileStatus list - */ - public static List listLocatedStatus(final FileSystem fs, - final Path dir) throws IOException { - List status = null; - try { - RemoteIterator locatedFileStatusRemoteIterator = fs - .listFiles(dir, false); - while (locatedFileStatusRemoteIterator.hasNext()) { - if (status == null) { - status = Lists.newArrayList(); - } - status.add(locatedFileStatusRemoteIterator.next()); - } - } catch (FileNotFoundException fnfe) { - // if directory doesn't exist, return null - if (LOG.isTraceEnabled()) { - LOG.trace(dir + " doesn't exist"); - } - } - return status; - } - - /** - * Calls fs.delete() and returns the value returned by the fs.delete() - * - * @param fs - * @param path - * @param recursive - * @return the value returned by the fs.delete() - * @throws IOException - */ - public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) - throws IOException { - return fs.delete(path, recursive); - } - - /** - * Calls fs.exists(). Checks if the specified path exists - * - * @param fs - * @param path - * @return the value returned by fs.exists() - * @throws IOException - */ - public static boolean isExists(final FileSystem fs, final Path path) throws IOException { - return fs.exists(path); - } - /** * Throw an exception if an action is not permitted by a user on a file. * @@ -2108,46 +1487,6 @@ public abstract class FSUtils { return false; } - /** - * Log the current state of the filesystem from a certain root directory - * @param fs filesystem to investigate - * @param root root file/directory to start logging from - * @param LOG log to output information - * @throws IOException if an unexpected exception occurs - */ - public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG) - throws IOException { - LOG.debug("Current file system:"); - logFSTree(LOG, fs, root, "|-"); - } - - /** - * Recursive helper to log the state of the FS - * - * @see #logFileSystemState(FileSystem, Path, Log) - */ - private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix) - throws IOException { - FileStatus[] files = FSUtils.listStatus(fs, root, null); - if (files == null) return; - - for (FileStatus file : files) { - if (file.isDirectory()) { - LOG.debug(prefix + file.getPath().getName() + "/"); - logFSTree(LOG, fs, file.getPath(), prefix + "---"); - } else { - LOG.debug(prefix + file.getPath().getName()); - } - } - } - - public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) - throws IOException { - // set the modify time for TimeToLive Cleaner - fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); - return fs.rename(src, dest); - } - /** * This function is to scan the root path of the file system to get the * degree of locality for each region on each of the servers having at least @@ -2397,4 +1736,5 @@ public abstract class FSUtils { return null; } } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 8880ca51c4e..430413748a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -38,7 +38,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Pair; /** @@ -52,7 +52,13 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { // Only public so classes back in regionserver.wal can access public interface AsyncWriter extends WALProvider.AsyncWriter { - void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; + /** + * @throws IOException if something goes wrong initializing an output stream + * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that + * meet the needs of the given Writer implementation. + */ + void init(FileSystem fs, Path path, Configuration c, boolean overwritable) + throws IOException, CommonFSUtils.StreamLacksCapabilityException; } private EventLoopGroup eventLoopGroup; @@ -60,7 +66,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { private Class channelClass; @Override protected AsyncFSWAL createWAL() throws IOException { - return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), + return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, @@ -96,7 +102,15 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { writer.init(fs, path, conf, overwritable); return writer; } catch (Exception e) { - LOG.debug("Error instantiating log writer.", e); + if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { + LOG.error("The RegionServer async write ahead log provider " + + "relies on the ability to call " + e.getMessage() + " for proper operation during " + + "component failures, but the current FileSystem does not support doing so. Please " + + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + + "it points to a FileSystem mount that has suitable capabilities for output streams."); + } else { + LOG.debug("Error instantiating log writer.", e); + } Throwables.propagateIfPossible(e, IOException.class); throw new IOException("cannot get log writer", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 459485c69a9..b72e66841e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceStability; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils; /** * A WAL provider that use {@link FSHLog}. @@ -44,7 +44,13 @@ public class FSHLogProvider extends AbstractFSWALProvider { // Only public so classes back in regionserver.wal can access public interface Writer extends WALProvider.Writer { - void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; + /** + * @throws IOException if something goes wrong initializing an output stream + * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that + * meet the needs of the given Writer implementation. + */ + void init(FileSystem fs, Path path, Configuration c, boolean overwritable) + throws IOException, CommonFSUtils.StreamLacksCapabilityException; } /** @@ -61,7 +67,15 @@ public class FSHLogProvider extends AbstractFSWALProvider { writer.init(fs, path, conf, overwritable); return writer; } catch (Exception e) { - LOG.debug("Error instantiating log writer.", e); + if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { + LOG.error("The RegionServer write ahead log provider for FileSystem implementations " + + "relies on the ability to call " + e.getMessage() + " for proper operation during " + + "component failures, but the current FileSystem does not support doing so. Please " + + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + + "it points to a FileSystem mount that has suitable capabilities for output streams."); + } else { + LOG.debug("Error instantiating log writer.", e); + } if (writer != null) { try{ writer.close(); @@ -75,7 +89,7 @@ public class FSHLogProvider extends AbstractFSWALProvider { @Override protected FSHLog createWAL() throws IOException { - return new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), + return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 1fb5d378f90..b0d689c8832 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.FSUtils; import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -55,7 +56,8 @@ public class TestLocalAsyncOutput { } @Test - public void test() throws IOException, InterruptedException, ExecutionException { + public void test() throws IOException, InterruptedException, ExecutionException, + FSUtils.StreamLacksCapabilityException { Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index 4e11778c8ac..9ea068a6a8c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -184,10 +185,8 @@ public class MockMasterServices extends MockNoopMasterServices { throws IOException { final Configuration conf = getConfiguration(); final Path logDir = new Path(fileSystemManager.getRootDir(), - MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); + WALProcedureStore.MASTER_PROCEDURE_LOGDIR); - //procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir, - // new MasterProcedureEnv.WALStoreLeaseRecovery(this)); this.procedureStore = new NoopProcedureStore(); this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java index 86f0abc817e..e0452c2f2a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java @@ -115,8 +115,8 @@ public class TestMasterProcedureWalLease { Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(), - firstMaster.getMasterFileSystem().getFileSystem(), ((WALProcedureStore)masterStore).getWALDir(), + null, new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); // Abort Latch for the test store final CountDownLatch backupStore3Abort = new CountDownLatch(1); @@ -195,8 +195,8 @@ public class TestMasterProcedureWalLease { Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(), - firstMaster.getMasterFileSystem().getFileSystem(), ((WALProcedureStore)procStore).getWALDir(), + null, new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); // start a second store which should fence the first one out diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java index 0c5ee1fe85f..7932d0006e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java @@ -76,7 +76,7 @@ public class TestWALProcedureStoreOnHDFS { MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); - store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir); + store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir); store.registerListener(stopProcedureListener); store.start(8); store.recoverLease(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index d5e8c1c8116..b736fae80de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -58,9 +58,9 @@ import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -139,8 +139,8 @@ public abstract class AbstractTestFSWAL { // test to see whether the coprocessor is loaded or not. AbstractFSWAL wal = null; try { - wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME, - CONF, null, true, null, null); + wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); WALCoprocessorHost host = wal.getCoprocessorHost(); Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); assertNotNull(c); @@ -187,8 +187,8 @@ public abstract class AbstractTestFSWAL { AbstractFSWAL wal1 = null; AbstractFSWAL walMeta = null; try { - wal1 = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME, - CONF, null, true, null, null); + wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); LOG.debug("Log obtained is: " + wal1); Comparator comp = wal1.LOG_NAME_COMPARATOR; Path p1 = wal1.computeFilename(11); @@ -197,9 +197,9 @@ public abstract class AbstractTestFSWAL { assertTrue(comp.compare(p1, p1) == 0); // comparing with different filenum. assertTrue(comp.compare(p1, p2) < 0); - walMeta = - newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME, - CONF, null, true, null, AbstractFSWALProvider.META_WAL_PROVIDER_ID); + walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, + AbstractFSWALProvider.META_WAL_PROVIDER_ID); Comparator compMeta = walMeta.LOG_NAME_COMPARATOR; Path p1WithMeta = walMeta.computeFilename(11); @@ -245,7 +245,7 @@ public abstract class AbstractTestFSWAL { LOG.debug("testFindMemStoresEligibleForFlush"); Configuration conf1 = HBaseConfiguration.create(CONF); conf1.setInt("hbase.regionserver.maxlogs", 1); - AbstractFSWAL wal = newWAL(FS, FSUtils.getWALRootDir(conf1), DIR.toString(), + AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); HTableDescriptor t1 = new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); @@ -332,9 +332,10 @@ public abstract class AbstractTestFSWAL { } @Test(expected = IOException.class) - public void testFailedToCreateWALIfParentRenamed() throws IOException { + public void testFailedToCreateWALIfParentRenamed() throws IOException, + CommonFSUtils.StreamLacksCapabilityException { final String name = "testFailedToCreateWALIfParentRenamed"; - AbstractFSWAL wal = newWAL(FS, FSUtils.getWALRootDir(CONF), name, + AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); long filenum = System.currentTimeMillis(); Path path = wal.computeFilename(filenum); @@ -373,17 +374,17 @@ public abstract class AbstractTestFSWAL { scopes.put(fam, 0); } // subclass and doctor a method. - AbstractFSWAL wal = newSlowWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF, - null, true, null, null, new Runnable() { + AbstractFSWAL wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), + testName, CONF, null, true, null, null, new Runnable() { - @Override - public void run() { - if (goslow.get()) { - Threads.sleep(100); - LOG.debug("Sleeping before appending 100ms"); + @Override + public void run() { + if (goslow.get()) { + Threads.sleep(100); + LOG.debug("Sleeping before appending 100ms"); + } } - } - }); + }); HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); @@ -434,8 +435,8 @@ public abstract class AbstractTestFSWAL { @Test public void testSyncNoAppend() throws IOException { String testName = currentTest.getMethodName(); - AbstractFSWAL wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF, - null, true, null, null); + AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, + CONF, null, true, null, null); try { wal.sync(); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index f18ad77e74c..283b85d8d6e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -1029,7 +1030,8 @@ public abstract class AbstractTestWALReplay { /** * testcase for https://issues.apache.org/jira/browse/HBASE-14949. */ - private void testNameConflictWhenSplit(boolean largeFirst) throws IOException { + private void testNameConflictWhenSplit(boolean largeFirst) throws IOException, + StreamLacksCapabilityException { final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); @@ -1071,12 +1073,12 @@ public abstract class AbstractTestWALReplay { } @Test - public void testNameConflictWhenSplit0() throws IOException { + public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException { testNameConflictWhenSplit(true); } @Test - public void testNameConflictWhenSplit1() throws IOException { + public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException { testNameConflictWhenSplit(false); } @@ -1231,7 +1233,8 @@ public abstract class AbstractTestWALReplay { return htd; } - private void writerWALFile(Path file, List entries) throws IOException { + private void writerWALFile(Path file, List entries) throws IOException, + StreamLacksCapabilityException { fs.mkdirs(file.getParent()); ProtobufLogWriter writer = new ProtobufLogWriter(); writer.init(fs, file, conf, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 548c0239af6..055c28d38b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.util.Random; import java.util.UUID; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -41,7 +43,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -57,6 +58,7 @@ import org.junit.experimental.categories.Category; */ @Category({MiscTests.class, MediumTests.class}) public class TestFSUtils { + private static final Log LOG = LogFactory.getLog(TestFSUtils.class); private HBaseTestingUtility htu; private FileSystem fs; @@ -69,53 +71,6 @@ public class TestFSUtils { conf = htu.getConfiguration(); } - /** - * Test path compare and prefix checking. - * @throws IOException - */ - @Test - public void testMatchingTail() throws IOException { - Path rootdir = htu.getDataTestDir(); - assertTrue(rootdir.depth() > 1); - Path partPath = new Path("a", "b"); - Path fullPath = new Path(rootdir, partPath); - Path fullyQualifiedPath = fs.makeQualified(fullPath); - assertFalse(FSUtils.isMatchingTail(fullPath, partPath)); - assertFalse(FSUtils.isMatchingTail(fullPath, partPath.toString())); - assertTrue(FSUtils.isStartingWithPath(rootdir, fullPath.toString())); - assertTrue(FSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString())); - assertFalse(FSUtils.isStartingWithPath(rootdir, partPath.toString())); - assertFalse(FSUtils.isMatchingTail(fullyQualifiedPath, partPath)); - assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath)); - assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString())); - assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath))); - assertTrue(FSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString())); - assertFalse(FSUtils.isMatchingTail(fullPath, new Path("x"))); - assertFalse(FSUtils.isMatchingTail(new Path("x"), fullPath)); - } - - @Test - public void testVersion() throws DeserializationException, IOException { - final Path rootdir = htu.getDataTestDir(); - assertNull(FSUtils.getVersion(fs, rootdir)); - // Write out old format version file. See if we can read it in and convert. - Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); - FSDataOutputStream s = fs.create(versionFile); - final String version = HConstants.FILE_SYSTEM_VERSION; - s.writeUTF(version); - s.close(); - assertTrue(fs.exists(versionFile)); - FileStatus [] status = fs.listStatus(versionFile); - assertNotNull(status); - assertTrue(status.length > 0); - String newVersion = FSUtils.getVersion(fs, rootdir); - assertEquals(version.length(), newVersion.length()); - assertEquals(version, newVersion); - // File will have been converted. Exercise the pb format - assertEquals(version, FSUtils.getVersion(fs, rootdir)); - FSUtils.checkVersion(fs, rootdir, true); - } - @Test public void testIsHDFS() throws Exception { assertFalse(FSUtils.isHDFS(conf)); MiniDFSCluster cluster = null; @@ -238,8 +193,33 @@ public class TestFSUtils { } } + @Test + public void testVersion() throws DeserializationException, IOException { + final Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); + assertNull(FSUtils.getVersion(fs, rootdir)); + // Write out old format version file. See if we can read it in and convert. + Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); + FSDataOutputStream s = fs.create(versionFile); + final String version = HConstants.FILE_SYSTEM_VERSION; + s.writeUTF(version); + s.close(); + assertTrue(fs.exists(versionFile)); + FileStatus [] status = fs.listStatus(versionFile); + assertNotNull(status); + assertTrue(status.length > 0); + String newVersion = FSUtils.getVersion(fs, rootdir); + assertEquals(version.length(), newVersion.length()); + assertEquals(version, newVersion); + // File will have been converted. Exercise the pb format + assertEquals(version, FSUtils.getVersion(fs, rootdir)); + FSUtils.checkVersion(fs, rootdir, true); + } + @Test public void testPermMask() throws Exception { + final Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); // default fs permission FsPermission defaultFsPerm = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); @@ -277,6 +257,8 @@ public class TestFSUtils { @Test public void testDeleteAndExists() throws Exception { + final Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); // then that the correct file is created @@ -302,6 +284,7 @@ public class TestFSUtils { } } + @Test public void testRenameAndSetModifyTime() throws Exception { MiniDFSCluster cluster = htu.startMiniDFSCluster(1); @@ -338,6 +321,24 @@ public class TestFSUtils { } } + @Test + public void testSetStoragePolicyDefault() throws Exception { + verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); + } + + /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ + @Test + public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { + verifyFileInDirWithStoragePolicy("ALL_SSD"); + } + + /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ + @Test + public void testSetStoragePolicyInvalid() throws Exception { + verifyFileInDirWithStoragePolicy("1772"); + } + + // Here instead of TestCommonFSUtils because we need a minicluster private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception { conf.set(HConstants.WAL_STORAGE_POLICY, policy); @@ -362,63 +363,6 @@ public class TestFSUtils { } } - @Test - public void testSetStoragePolicyDefault() throws Exception { - verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); - } - - /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ - @Test - public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { - verifyFileInDirWithStoragePolicy("ALL_SSD"); - } - - /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ - @Test - public void testSetStoragePolicyInvalid() throws Exception { - verifyFileInDirWithStoragePolicy("1772"); - } - - @Test - public void testSetWALRootDir() throws Exception { - Path p = new Path("file:///hbase/root"); - FSUtils.setWALRootDir(conf, p); - assertEquals(p.toString(), conf.get(HFileSystem.HBASE_WAL_DIR)); - } - - @Test - public void testGetWALRootDir() throws IOException { - Path root = new Path("file:///hbase/root"); - Path walRoot = new Path("file:///hbase/logroot"); - FSUtils.setRootDir(conf, root); - assertEquals(FSUtils.getRootDir(conf), root); - assertEquals(FSUtils.getWALRootDir(conf), root); - FSUtils.setWALRootDir(conf, walRoot); - assertEquals(FSUtils.getWALRootDir(conf), walRoot); - } - - @Test(expected=IllegalStateException.class) - public void testGetWALRootDirIllegalWALDir() throws IOException { - Path root = new Path("file:///hbase/root"); - Path invalidWALDir = new Path("file:///hbase/root/logroot"); - FSUtils.setRootDir(conf, root); - FSUtils.setWALRootDir(conf, invalidWALDir); - FSUtils.getWALRootDir(conf); - } - - @Test - public void testRemoveWALRootPath() throws Exception { - FSUtils.setRootDir(conf, new Path("file:///user/hbase")); - Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile"); - Path tmpFile = new Path("file:///test/testfile"); - assertEquals(FSUtils.removeWALRootPath(testFile, conf), "test/testfile"); - assertEquals(FSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString()); - FSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir")); - assertEquals(FSUtils.removeWALRootPath(testFile, conf), testFile.toString()); - Path logFile = new Path(FSUtils.getWALRootDir(conf), "test/testlog"); - assertEquals(FSUtils.removeWALRootPath(logFile, conf), "test/testlog"); - } - /** * Ugly test that ensures we can get at the hedged read counters in dfsclient. * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread. @@ -565,4 +509,37 @@ public class TestFSUtils { assertTrue(fileSys.delete(name, true)); assertTrue(!fileSys.exists(name)); } + + + private static final boolean STREAM_CAPABILITIES_IS_PRESENT; + static { + boolean tmp = false; + try { + Class.forName("org.apache.hadoop.fs.StreamCapabilities"); + tmp = true; + LOG.debug("Test thought StreamCapabilities class was present."); + } catch (ClassNotFoundException exception) { + LOG.debug("Test didn't think StreamCapabilities class was present."); + } finally { + STREAM_CAPABILITIES_IS_PRESENT = tmp; + } + } + + // Here instead of TestCommonFSUtils because we need a minicluster + @Test + public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception { + MiniDFSCluster cluster = htu.startMiniDFSCluster(1); + try (FileSystem filesystem = cluster.getFileSystem()) { + FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar")); + assertTrue(FSUtils.hasCapability(stream, "hsync")); + assertTrue(FSUtils.hasCapability(stream, "hflush")); + assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " + + "StreamCapabilities class is not defined.", + STREAM_CAPABILITIES_IS_PRESENT, + FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add.")); + } finally { + cluster.shutdown(); + } + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 944a4f168f1..f578c11861b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -38,7 +38,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; /** @@ -100,7 +100,7 @@ public class IOTestProvider implements WALProvider { providerId = DEFAULT_PROVIDER_ID; } final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; - log = new IOTestWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), + log = new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); @@ -184,7 +184,12 @@ public class IOTestProvider implements WALProvider { if (!initialized || doFileRolls) { LOG.info("creating new writer instance."); final ProtobufLogWriter writer = new IOTestWriter(); - writer.init(fs, path, conf, false); + try { + writer.init(fs, path, conf, false); + } catch (CommonFSUtils.StreamLacksCapabilityException exception) { + throw new IOException("Can't create writer instance because underlying FileSystem " + + "doesn't support needed stream capabilities.", exception); + } if (!initialized) { LOG.info("storing initial writer instance in case file rolling isn't allowed."); noRollsWriter = writer; @@ -207,7 +212,8 @@ public class IOTestProvider implements WALProvider { private boolean doSyncs; @Override - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException { + public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) + throws IOException, CommonFSUtils.StreamLacksCapabilityException { Collection operations = conf.getStringCollection(ALLOWED_OPERATIONS); if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { doAppends = doSyncs = true;