+ *
* Used for doing updating of OPENING znode during log replay on region open. */ @InterfaceAudience.Private diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java new file mode 100644 index 00000000000..9c3da1658c7 --- /dev/null +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.Method; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility methods for recovering file lease for hdfs. + */ +@InterfaceAudience.Private +public final class RecoverLeaseFSUtils { + + private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class); + + private RecoverLeaseFSUtils() { + } + + public static void recoverFileLease(FileSystem fs, Path p, Configuration conf) + throws IOException { + recoverFileLease(fs, p, conf, null); + } + + /** + * Recover the lease from HDFS, retrying multiple times. + */ + public static void recoverFileLease(FileSystem fs, Path p, Configuration conf, + CancelableProgressable reporter) throws IOException { + if (fs instanceof FilterFileSystem) { + fs = ((FilterFileSystem) fs).getRawFileSystem(); + } + // lease recovery not needed for local file system case. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter); + } + + /* + * Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the + * lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has + * succeeded or the file is closed. But, we have to be careful. Each time we call recoverLease, it + * starts the recover lease process over from the beginning. We could put ourselves in a situation + * where we are doing nothing but starting a recovery, interrupting it to start again, and so on. + * The findings over in HBASE-8354 have it that the namenode will try to recover the lease on the + * file's primary node. If all is well, it should return near immediately. But, as is common, it + * is the very primary node that has crashed and so the namenode will be stuck waiting on a socket + * timeout before it will ask another datanode to start the recovery. It does not help if we call + * recoverLease in the meantime and in particular, subsequent to the socket timeout, a + * recoverLease invocation will cause us to start over from square one (possibly waiting on socket + * timeout against primary node). So, in the below, we do the following: 1. Call recoverLease. 2. + * If it returns true, break. 3. If it returns false, wait a few seconds and then call it again. + * 4. If it returns true, break. 5. If it returns false, wait for what we think the datanode + * socket timeout is (configurable) and then try again. 6. If it returns true, break. 7. If it + * returns false, repeat starting at step 5. above. If HDFS-4525 is available, call it every + * second and we might be able to exit early. + */ + private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, + final Configuration conf, final CancelableProgressable reporter) throws IOException { + LOG.info("Recover lease on dfs file " + p); + long startWaiting = EnvironmentEdgeManager.currentTime(); + // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS + // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves + // beyond that limit 'to be safe'. + long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting; + // This setting should be a little bit above what the cluster dfs heartbeat is set to. + long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000); + // This should be set to how long it'll take for us to timeout against primary datanode if it + // is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the + // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this + // timeout, then further recovery will take liner backoff with this base, to avoid endless + // preemptions when this value is not properly configured. + long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000); + + Method isFileClosedMeth = null; + // whether we need to look for isFileClosed method + boolean findIsFileClosedMeth = true; + boolean recovered = false; + // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. + for (int nbAttempt = 0; !recovered; nbAttempt++) { + recovered = recoverLease(dfs, nbAttempt, p, startWaiting); + if (recovered) { + break; + } + checkIfCancelled(reporter); + if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) { + break; + } + try { + // On the first time through wait the short 'firstPause'. + if (nbAttempt == 0) { + Thread.sleep(firstPause); + } else { + // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check + // isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though. + long localStartWaiting = EnvironmentEdgeManager.currentTime(); + while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) < subsequentPauseBase * + nbAttempt) { + Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000)); + if (findIsFileClosedMeth) { + try { + isFileClosedMeth = + dfs.getClass().getMethod("isFileClosed", new Class[] { Path.class }); + } catch (NoSuchMethodException nsme) { + LOG.debug("isFileClosed not available"); + } finally { + findIsFileClosedMeth = false; + } + } + if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { + recovered = true; + break; + } + checkIfCancelled(reporter); + } + } + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + return recovered; + } + + private static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout, + final int nbAttempt, final Path p, final long startWaiting) { + if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) { + LOG.warn("Cannot recoverLease after trying for " + + conf.getInt("hbase.lease.recovery.timeout", 900000) + + "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " + + getLogMessageDetail(nbAttempt, p, startWaiting)); + return true; + } + return false; + } + + /** + * Try to recover the lease. + * @return True if dfs#recoverLease came by true. + */ + private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, + final Path p, final long startWaiting) throws FileNotFoundException { + boolean recovered = false; + try { + recovered = dfs.recoverLease(p); + LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ") + + getLogMessageDetail(nbAttempt, p, startWaiting)); + } catch (IOException e) { + if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { + // This exception comes out instead of FNFE, fix it + throw new FileNotFoundException("The given WAL wasn't found at " + p); + } else if (e instanceof FileNotFoundException) { + throw (FileNotFoundException) e; + } + LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); + } + return recovered; + } + + /** + * @return Detail to append to any log message around lease recovering. + */ + private static String getLogMessageDetail(final int nbAttempt, final Path p, + final long startWaiting) { + return "attempt=" + nbAttempt + " on file=" + p + " after " + + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms"; + } + + /** + * Call HDFS-4525 isFileClosed if it is available. + * @return True if file is closed. + */ + private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m, + final Path p) { + try { + return (Boolean) m.invoke(dfs, p); + } catch (SecurityException e) { + LOG.warn("No access", e); + } catch (Exception e) { + LOG.warn("Failed invocation for " + p.toString(), e); + } + return false; + } + + private static void checkIfCancelled(final CancelableProgressable reporter) + throws InterruptedIOException { + if (reporter == null) { + return; + } + if (!reporter.progress()) { + throw new InterruptedIOException("Operation cancelled"); + } + } +} diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java new file mode 100644 index 00000000000..9b276aca078 --- /dev/null +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AsyncFSTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncFSTestBase.class); + + protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + protected static File CLUSTER_TEST_DIR; + + protected static MiniDFSCluster CLUSTER; + + private static boolean deleteOnExit() { + String v = System.getProperty("hbase.testing.preserve.testdir"); + // Let default be true, to delete on exit. + return v == null ? true : !Boolean.parseBoolean(v); + } + + /** + * Creates a directory for the cluster, under the test data + */ + protected static void setupClusterTestDir() { + // Using randomUUID ensures that multiple clusters can be launched by + // a same test, if it stops & starts them + Path testDir = + UTIL.getDataTestDir("cluster_" + HBaseCommonTestingUtility.getRandomUUID().toString()); + CLUSTER_TEST_DIR = new File(testDir.toString()).getAbsoluteFile(); + // Have it cleaned up on exit + boolean b = deleteOnExit(); + if (b) { + CLUSTER_TEST_DIR.deleteOnExit(); + } + LOG.info("Created new mini-cluster data directory: {}, deleteOnExit={}", CLUSTER_TEST_DIR, b); + } + + private static String createDirAndSetProperty(final String property) { + return createDirAndSetProperty(property, property); + } + + private static String createDirAndSetProperty(final String relPath, String property) { + String path = UTIL.getDataTestDir(relPath).toString(); + System.setProperty(property, path); + UTIL.getConfiguration().set(property, path); + new File(path).mkdirs(); + LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); + return path; + } + + private static void createDirsAndSetProperties() throws IOException { + setupClusterTestDir(); + System.setProperty("test.build.data", CLUSTER_TEST_DIR.getPath()); + createDirAndSetProperty("test.cache.data"); + createDirAndSetProperty("hadoop.tmp.dir"); + + // Frustrate yarn's and hdfs's attempts at writing /tmp. + // Below is fragile. Make it so we just interpolate any 'tmp' reference. + createDirAndSetProperty("dfs.journalnode.edits.dir"); + createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); + createDirAndSetProperty("nfs.dump.dir"); + createDirAndSetProperty("java.io.tmpdir"); + createDirAndSetProperty("dfs.journalnode.edits.dir"); + createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); + createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); + } + + protected static void startMiniDFSCluster(int servers) throws IOException { + if (CLUSTER != null) { + throw new IllegalStateException("Already started"); + } + createDirsAndSetProperties(); + + Configuration conf = UTIL.getConfiguration(); + // Error level to skip some warnings specific to the minicluster. See HBASE-4709 + org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class) + .setLevel(org.apache.log4j.Level.ERROR); + org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class) + .setLevel(org.apache.log4j.Level.ERROR); + + TraceUtil.initTracer(conf); + CLUSTER = new MiniDFSCluster.Builder(conf).numDataNodes(servers).build(); + CLUSTER.waitClusterUp(); + } + + protected static void shutdownMiniDFSCluster() { + if (CLUSTER != null) { + CLUSTER.shutdown(true); + CLUSTER = null; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java similarity index 90% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java rename to hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index d23e8161342..03ff1ee7753 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -63,16 +62,14 @@ import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; @Category({ MiscTests.class, MediumTests.class }) -public class TestFanOutOneBlockAsyncDFSOutput { +public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class); + HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class); private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static DistributedFileSystem FS; private static EventLoopGroup EVENT_LOOP_GROUP; @@ -86,9 +83,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { @BeforeClass public static void setUp() throws Exception { - TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); - TEST_UTIL.startMiniDFSCluster(3); - FS = TEST_UTIL.getDFSCluster().getFileSystem(); + UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + startMiniDFSCluster(3); + FS = CLUSTER.getFileSystem(); EVENT_LOOP_GROUP = new NioEventLoopGroup(); CHANNEL_CLASS = NioSocketChannel.class; } @@ -98,11 +95,11 @@ public class TestFanOutOneBlockAsyncDFSOutput { if (EVENT_LOOP_GROUP != null) { EVENT_LOOP_GROUP.shutdownGracefully().sync(); } - TEST_UTIL.shutdownMiniDFSCluster(); + shutdownMiniDFSCluster(); } static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) - throws IOException, InterruptedException, ExecutionException { + throws IOException, InterruptedException, ExecutionException { List- * Only a single instance may be registered for a given {@link Service} subclass (the - * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}. - * After the first registration, subsequent calls with the same service name will fail with - * a return value of {@code false}. - *
+ * + * Only a single instance may be registered for a given {@link Service} subclass (the instances + * are keyed on + * {@link org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}. + * After the first registration, subsequent calls with the same service name will fail with a + * return value of {@code false}. * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint - * @return {@code true} if the registration was successful, {@code false} - * otherwise + * @return {@code true} if the registration was successful, {@code false} otherwise */ boolean registerService(Service instance); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 292d76e8f4a..7baa5bd928b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -57,7 +57,7 @@ public class MasterProcedureEnv implements ConfigurationObserver { @Override public void recoverFileLease(final FileSystem fs, final Path path) throws IOException { final Configuration conf = master.getConfiguration(); - FSUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() { + RecoverLeaseFSUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() { @Override public boolean progress() { LOG.debug("Recover Procedure Store log lease: " + path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index e8ae3e5687b..d2cb6f33c15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; @@ -384,7 +384,7 @@ class WALEntryStream implements Closeable { private void recoverLease(final Configuration conf, final Path path) { try { final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf); - FSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { @Override public boolean progress() { LOG.debug("recover WAL lease: " + path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index ece76fb6b9f..420fdb02ca7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -58,7 +58,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; @@ -80,7 +79,6 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.Progressable; @@ -1872,181 +1870,4 @@ public final class FSUtils { return false; } - - public static void recoverFileLease(FileSystem fs, Path p, Configuration conf) - throws IOException { - recoverFileLease(fs, p, conf, null); - } - - /** - * Recover the lease from HDFS, retrying multiple times. - */ - public static void recoverFileLease(FileSystem fs, Path p, Configuration conf, - CancelableProgressable reporter) throws IOException { - if (fs instanceof FilterFileSystem) { - fs = ((FilterFileSystem) fs).getRawFileSystem(); - } - // lease recovery not needed for local file system case. - if (!(fs instanceof DistributedFileSystem)) { - return; - } - recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter); - } - - /* - * Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the - * lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has - * succeeded or the file is closed. But, we have to be careful. Each time we call recoverLease, it - * starts the recover lease process over from the beginning. We could put ourselves in a situation - * where we are doing nothing but starting a recovery, interrupting it to start again, and so on. - * The findings over in HBASE-8354 have it that the namenode will try to recover the lease on the - * file's primary node. If all is well, it should return near immediately. But, as is common, it - * is the very primary node that has crashed and so the namenode will be stuck waiting on a socket - * timeout before it will ask another datanode to start the recovery. It does not help if we call - * recoverLease in the meantime and in particular, subsequent to the socket timeout, a - * recoverLease invocation will cause us to start over from square one (possibly waiting on socket - * timeout against primary node). So, in the below, we do the following: 1. Call recoverLease. 2. - * If it returns true, break. 3. If it returns false, wait a few seconds and then call it again. - * 4. If it returns true, break. 5. If it returns false, wait for what we think the datanode - * socket timeout is (configurable) and then try again. 6. If it returns true, break. 7. If it - * returns false, repeat starting at step 5. above. If HDFS-4525 is available, call it every - * second and we might be able to exit early. - */ - private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, - final Configuration conf, final CancelableProgressable reporter) throws IOException { - LOG.info("Recover lease on dfs file " + p); - long startWaiting = EnvironmentEdgeManager.currentTime(); - // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS - // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves - // beyond that limit 'to be safe'. - long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting; - // This setting should be a little bit above what the cluster dfs heartbeat is set to. - long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000); - // This should be set to how long it'll take for us to timeout against primary datanode if it - // is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the - // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this - // timeout, then further recovery will take liner backoff with this base, to avoid endless - // preemptions when this value is not properly configured. - long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000); - - Method isFileClosedMeth = null; - // whether we need to look for isFileClosed method - boolean findIsFileClosedMeth = true; - boolean recovered = false; - // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. - for (int nbAttempt = 0; !recovered; nbAttempt++) { - recovered = recoverLease(dfs, nbAttempt, p, startWaiting); - if (recovered) { - break; - } - checkIfCancelled(reporter); - if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) { - break; - } - try { - // On the first time through wait the short 'firstPause'. - if (nbAttempt == 0) { - Thread.sleep(firstPause); - } else { - // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check - // isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though. - long localStartWaiting = EnvironmentEdgeManager.currentTime(); - while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) < subsequentPauseBase * - nbAttempt) { - Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000)); - if (findIsFileClosedMeth) { - try { - isFileClosedMeth = - dfs.getClass().getMethod("isFileClosed", new Class[] { Path.class }); - } catch (NoSuchMethodException nsme) { - LOG.debug("isFileClosed not available"); - } finally { - findIsFileClosedMeth = false; - } - } - if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { - recovered = true; - break; - } - checkIfCancelled(reporter); - } - } - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; - } - } - return recovered; - } - - private static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout, - final int nbAttempt, final Path p, final long startWaiting) { - if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) { - LOG.warn("Cannot recoverLease after trying for " + - conf.getInt("hbase.lease.recovery.timeout", 900000) + - "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " + - getLogMessageDetail(nbAttempt, p, startWaiting)); - return true; - } - return false; - } - - /** - * Try to recover the lease. - * @return True if dfs#recoverLease came by true. - */ - private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, - final Path p, final long startWaiting) throws FileNotFoundException { - boolean recovered = false; - try { - recovered = dfs.recoverLease(p); - LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ") + - getLogMessageDetail(nbAttempt, p, startWaiting)); - } catch (IOException e) { - if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { - // This exception comes out instead of FNFE, fix it - throw new FileNotFoundException("The given WAL wasn't found at " + p); - } else if (e instanceof FileNotFoundException) { - throw (FileNotFoundException) e; - } - LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); - } - return recovered; - } - - /** - * @return Detail to append to any log message around lease recovering. - */ - private static String getLogMessageDetail(final int nbAttempt, final Path p, - final long startWaiting) { - return "attempt=" + nbAttempt + " on file=" + p + " after " + - (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms"; - } - - /** - * Call HDFS-4525 isFileClosed if it is available. - * @return True if file is closed. - */ - private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m, - final Path p) { - try { - return (Boolean) m.invoke(dfs, p); - } catch (SecurityException e) { - LOG.warn("No access", e); - } catch (Exception e) { - LOG.warn("Failed invocation for " + p.toString(), e); - } - return false; - } - - private static void checkIfCancelled(final CancelableProgressable reporter) - throws InterruptedIOException { - if (reporter == null) { - return; - } - if (!reporter.progress()) { - throw new InterruptedIOException("Operation cancelled"); - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index da84c15f23a..f046cd043da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -845,7 +845,7 @@ public class RegionSplitter { fs.rename(tmpFile, splitFile); } else { LOG.debug("_balancedSplit file found. Replay log to restore state..."); - FSUtils.recoverFileLease(fs, splitFile, connection.getConfiguration(), null); + RecoverLeaseFSUtils.recoverFileLease(fs, splitFile, connection.getConfiguration(), null); // parse split file and process remaining splits FSDataInputStream tmpIn = fs.open(splitFile); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index c85a688ad4c..ce6770fd0f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -515,7 +515,7 @@ public abstract class AbstractFSWALProvider+ * Configuration conf = TEST_UTIL.getConfiguration(); + * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { + * Map.Entry<String, String> e = i.next(); + * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); + * } + **/ private void createDirsAndSetProperties() throws IOException { setupClusterTestDir(); @@ -741,7 +745,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); createDirAndSetProperty("nfs.dump.dir"); createDirAndSetProperty("java.io.tmpdir"); - createDirAndSetProperty("java.io.tmpdir"); createDirAndSetProperty("dfs.journalnode.edits.dir"); createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index fb07d2a64d9..691250a5609 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -497,8 +497,8 @@ public class TestLogRolling extends AbstractTestLogRolling { Set