From 388fae73ffd6e079f189077ca814375d6addeef0 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sat, 9 May 2020 12:00:45 +0800 Subject: [PATCH] HBASE-24333 Backport HBASE-24304 "Separate a hbase-asyncfs module" to branch-2.x (#1664) Signed-off-by: stack --- .../src/main/assembly/client-components.xml | 8 + .../src/main/assembly/components.xml | 8 + .../src/main/assembly/hadoop-three-compat.xml | 1 + .../src/main/assembly/hadoop-two-compat.xml | 1 + hbase-asyncfs/pom.xml | 242 ++++++++++++++++++ .../hbase/io/asyncfs/AsyncFSOutput.java | 0 .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 0 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 4 +- .../FanOutOneBlockAsyncDFSOutputHelper.java | 0 ...anOutOneBlockAsyncDFSOutputSaslHelper.java | 0 .../hbase/io/asyncfs/ProtobufDecoder.java | 0 .../io/asyncfs/SendBufSizePredictor.java | 0 .../io/asyncfs/WrapperAsyncFSOutput.java | 0 .../hbase/util/CancelableProgressable.java | 2 +- .../hbase/util/RecoverLeaseFSUtils.java | 221 ++++++++++++++++ .../hbase/io/asyncfs/AsyncFSTestBase.java | 118 +++++++++ .../TestFanOutOneBlockAsyncDFSOutput.java | 27 +- .../io/asyncfs/TestLocalAsyncOutput.java | 4 +- .../TestOverwriteFileUnderConstruction.java | 13 +- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 89 +++++-- .../io/asyncfs/TestSendBufSizePredictor.java | 0 .../hbase/security/HBaseKerberosUtils.java | 42 ++- .../hbase/util/TestRecoverLeaseFSUtils.java | 81 +----- .../src/test/resources/hbase-site.xml | 179 +++++++++++++ .../src/test/resources/hdfs-site.xml | 56 ++++ .../src/test/resources/log4j.properties | 68 +++++ hbase-endpoint/pom.xml | 6 + hbase-examples/pom.xml | 10 + hbase-mapreduce/pom.xml | 10 + hbase-rest/pom.xml | 6 + hbase-server/pom.xml | 10 + .../hadoop/hbase/master/MasterServices.java | 16 +- .../master/procedure/MasterProcedureEnv.java | 4 +- .../regionserver/WALEntryStream.java | 4 +- .../org/apache/hadoop/hbase/util/FSUtils.java | 179 ------------- .../hadoop/hbase/util/RegionSplitter.java | 2 +- .../hbase/wal/AbstractFSWALProvider.java | 4 +- .../apache/hadoop/hbase/wal/WALSplitter.java | 4 +- .../hadoop/hbase/HBaseTestingUtility.java | 21 +- .../regionserver/wal/TestLogRolling.java | 6 +- .../apache/hadoop/hbase/util/TestFSUtils.java | 59 ++++- .../hadoop/hbase/wal/TestWALFactory.java | 4 +- .../hbase-shaded-testing-util/pom.xml | 6 + hbase-testing-util/pom.xml | 6 + hbase-thrift/pom.xml | 6 + pom.xml | 14 + 46 files changed, 1182 insertions(+), 359 deletions(-) create mode 100644 hbase-asyncfs/pom.xml rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java (100%) rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java (100%) rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java (99%) rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java (100%) rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java (100%) rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java (100%) rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java (100%) rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java (100%) rename {hbase-server => hbase-asyncfs}/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java (99%) create mode 100644 hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java create mode 100644 hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java rename {hbase-server => hbase-asyncfs}/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java (90%) rename {hbase-server => hbase-asyncfs}/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java (94%) rename {hbase-server => hbase-asyncfs}/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java (91%) rename {hbase-server => hbase-asyncfs}/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java (69%) rename {hbase-server => hbase-asyncfs}/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java (100%) rename {hbase-server => hbase-asyncfs}/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java (87%) rename hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java => hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java (56%) create mode 100644 hbase-asyncfs/src/test/resources/hbase-site.xml create mode 100644 hbase-asyncfs/src/test/resources/hdfs-site.xml create mode 100644 hbase-asyncfs/src/test/resources/log4j.properties diff --git a/hbase-assembly/src/main/assembly/client-components.xml b/hbase-assembly/src/main/assembly/client-components.xml index fced18d5962..740e9bc74b3 100644 --- a/hbase-assembly/src/main/assembly/client-components.xml +++ b/hbase-assembly/src/main/assembly/client-components.xml @@ -119,6 +119,14 @@ 0644 + + ${project.basedir}/../hbase-asyncfs/target/ + lib + + ${asyncfs.test.jar} + + 0644 + ${project.basedir}/../hbase-zookeeper/target/ lib diff --git a/hbase-assembly/src/main/assembly/components.xml b/hbase-assembly/src/main/assembly/components.xml index 18dd4955903..b2fd06595ae 100644 --- a/hbase-assembly/src/main/assembly/components.xml +++ b/hbase-assembly/src/main/assembly/components.xml @@ -160,6 +160,14 @@ 0644 + + ${project.basedir}/../hbase-asyncfs/target/ + lib + + ${asyncfs.test.jar} + + 0644 + ${project.basedir}/../hbase-zookeeper/target/ lib diff --git a/hbase-assembly/src/main/assembly/hadoop-three-compat.xml b/hbase-assembly/src/main/assembly/hadoop-three-compat.xml index 310d06c4a3e..9c221f11311 100644 --- a/hbase-assembly/src/main/assembly/hadoop-three-compat.xml +++ b/hbase-assembly/src/main/assembly/hadoop-three-compat.xml @@ -34,6 +34,7 @@ org.apache.hbase:hbase-annotations + org.apache.hbase:hbase-asyncfs org.apache.hbase:hbase-backup org.apache.hbase:hbase-client org.apache.hbase:hbase-common diff --git a/hbase-assembly/src/main/assembly/hadoop-two-compat.xml b/hbase-assembly/src/main/assembly/hadoop-two-compat.xml index e0e68a0b899..5520fbc24b8 100644 --- a/hbase-assembly/src/main/assembly/hadoop-two-compat.xml +++ b/hbase-assembly/src/main/assembly/hadoop-two-compat.xml @@ -34,6 +34,7 @@ org.apache.hbase:hbase-annotations + org.apache.hbase:hbase-asyncfs org.apache.hbase:hbase-client org.apache.hbase:hbase-common org.apache.hbase:hbase-examples diff --git a/hbase-asyncfs/pom.xml b/hbase-asyncfs/pom.xml new file mode 100644 index 00000000000..452088b163e --- /dev/null +++ b/hbase-asyncfs/pom.xml @@ -0,0 +1,242 @@ + + + + 4.0.0 + + hbase-build-configuration + org.apache.hbase + 2.3.0-SNAPSHOT + ../hbase-build-configuration + + + hbase-asyncfs + Apache HBase - Asynchronous FileSystem + HBase Asynchronous FileSystem Implementation for WAL + + + + + org.apache.maven.plugins + maven-source-plugin + + + + maven-assembly-plugin + + true + + + + net.revelc.code + warbucks-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + true + + + + + + + + org.apache.hbase + hbase-common + test-jar + test + + + org.apache.hbase + hbase-annotations + test-jar + test + + + org.apache.hbase + hbase-common + + + org.apache.hbase + hbase-client + + + org.slf4j + slf4j-api + + + com.github.stephenc.findbugs + findbugs-annotations + compile + true + + + junit + junit + test + + + org.bouncycastle + bcprov-jdk15on + test + + + org.apache.hadoop + hadoop-minikdc + test + + + bouncycastle + bcprov-jdk15 + + + + + org.apache.kerby + kerb-client + test + + + org.apache.kerby + kerb-simplekdc + test + + + org.apache.hbase + hbase-http + test-jar + test + + + org.mockito + mockito-core + test + + + + + + + hadoop-2.0 + + + + !hadoop.profile + + + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-hdfs + test-jar + test + + + org.apache.hadoop + hadoop-annotations + + + org.apache.hadoop + hadoop-minicluster + test + + + + + + hadoop-3.0 + + + hadoop.profile + 3.0 + + + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-hdfs-client + + + org.apache.hadoop + hadoop-hdfs + test-jar + test + + + org.apache.hadoop + hadoop-annotations + + + org.apache.hadoop + hadoop-minicluster + test + + + + + eclipse-specific + + + m2e.version + + + + + + + + org.eclipse.m2e + lifecycle-mapping + + + + + + + + + + + + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java similarity index 100% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java similarity index 100% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java similarity index 99% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 24958466373..ed5bbf0f7da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -46,7 +46,7 @@ import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -552,7 +552,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { datanodeList.forEach(ch -> ch.close()); datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); endFileLease(client, fileId); - FSUtils.recoverFileLease(dfs, new Path(src), conf, + RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, reporter == null ? new CancelOnClose(client) : reporter); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java similarity index 100% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java similarity index 100% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java similarity index 100% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java similarity index 100% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java similarity index 100% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java similarity index 99% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java rename to hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java index 6aec09f09d1..91c003cb6dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java @@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience; /** * Similar interface as {@link org.apache.hadoop.util.Progressable} but returns * a boolean to support canceling the operation. - *

+ *

* Used for doing updating of OPENING znode during log replay on region open. */ @InterfaceAudience.Private diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java new file mode 100644 index 00000000000..9c3da1658c7 --- /dev/null +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.Method; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility methods for recovering file lease for hdfs. + */ +@InterfaceAudience.Private +public final class RecoverLeaseFSUtils { + + private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class); + + private RecoverLeaseFSUtils() { + } + + public static void recoverFileLease(FileSystem fs, Path p, Configuration conf) + throws IOException { + recoverFileLease(fs, p, conf, null); + } + + /** + * Recover the lease from HDFS, retrying multiple times. + */ + public static void recoverFileLease(FileSystem fs, Path p, Configuration conf, + CancelableProgressable reporter) throws IOException { + if (fs instanceof FilterFileSystem) { + fs = ((FilterFileSystem) fs).getRawFileSystem(); + } + // lease recovery not needed for local file system case. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter); + } + + /* + * Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the + * lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has + * succeeded or the file is closed. But, we have to be careful. Each time we call recoverLease, it + * starts the recover lease process over from the beginning. We could put ourselves in a situation + * where we are doing nothing but starting a recovery, interrupting it to start again, and so on. + * The findings over in HBASE-8354 have it that the namenode will try to recover the lease on the + * file's primary node. If all is well, it should return near immediately. But, as is common, it + * is the very primary node that has crashed and so the namenode will be stuck waiting on a socket + * timeout before it will ask another datanode to start the recovery. It does not help if we call + * recoverLease in the meantime and in particular, subsequent to the socket timeout, a + * recoverLease invocation will cause us to start over from square one (possibly waiting on socket + * timeout against primary node). So, in the below, we do the following: 1. Call recoverLease. 2. + * If it returns true, break. 3. If it returns false, wait a few seconds and then call it again. + * 4. If it returns true, break. 5. If it returns false, wait for what we think the datanode + * socket timeout is (configurable) and then try again. 6. If it returns true, break. 7. If it + * returns false, repeat starting at step 5. above. If HDFS-4525 is available, call it every + * second and we might be able to exit early. + */ + private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, + final Configuration conf, final CancelableProgressable reporter) throws IOException { + LOG.info("Recover lease on dfs file " + p); + long startWaiting = EnvironmentEdgeManager.currentTime(); + // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS + // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves + // beyond that limit 'to be safe'. + long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting; + // This setting should be a little bit above what the cluster dfs heartbeat is set to. + long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000); + // This should be set to how long it'll take for us to timeout against primary datanode if it + // is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the + // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this + // timeout, then further recovery will take liner backoff with this base, to avoid endless + // preemptions when this value is not properly configured. + long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000); + + Method isFileClosedMeth = null; + // whether we need to look for isFileClosed method + boolean findIsFileClosedMeth = true; + boolean recovered = false; + // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. + for (int nbAttempt = 0; !recovered; nbAttempt++) { + recovered = recoverLease(dfs, nbAttempt, p, startWaiting); + if (recovered) { + break; + } + checkIfCancelled(reporter); + if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) { + break; + } + try { + // On the first time through wait the short 'firstPause'. + if (nbAttempt == 0) { + Thread.sleep(firstPause); + } else { + // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check + // isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though. + long localStartWaiting = EnvironmentEdgeManager.currentTime(); + while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) < subsequentPauseBase * + nbAttempt) { + Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000)); + if (findIsFileClosedMeth) { + try { + isFileClosedMeth = + dfs.getClass().getMethod("isFileClosed", new Class[] { Path.class }); + } catch (NoSuchMethodException nsme) { + LOG.debug("isFileClosed not available"); + } finally { + findIsFileClosedMeth = false; + } + } + if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { + recovered = true; + break; + } + checkIfCancelled(reporter); + } + } + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + return recovered; + } + + private static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout, + final int nbAttempt, final Path p, final long startWaiting) { + if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) { + LOG.warn("Cannot recoverLease after trying for " + + conf.getInt("hbase.lease.recovery.timeout", 900000) + + "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " + + getLogMessageDetail(nbAttempt, p, startWaiting)); + return true; + } + return false; + } + + /** + * Try to recover the lease. + * @return True if dfs#recoverLease came by true. + */ + private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, + final Path p, final long startWaiting) throws FileNotFoundException { + boolean recovered = false; + try { + recovered = dfs.recoverLease(p); + LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ") + + getLogMessageDetail(nbAttempt, p, startWaiting)); + } catch (IOException e) { + if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { + // This exception comes out instead of FNFE, fix it + throw new FileNotFoundException("The given WAL wasn't found at " + p); + } else if (e instanceof FileNotFoundException) { + throw (FileNotFoundException) e; + } + LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); + } + return recovered; + } + + /** + * @return Detail to append to any log message around lease recovering. + */ + private static String getLogMessageDetail(final int nbAttempt, final Path p, + final long startWaiting) { + return "attempt=" + nbAttempt + " on file=" + p + " after " + + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms"; + } + + /** + * Call HDFS-4525 isFileClosed if it is available. + * @return True if file is closed. + */ + private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m, + final Path p) { + try { + return (Boolean) m.invoke(dfs, p); + } catch (SecurityException e) { + LOG.warn("No access", e); + } catch (Exception e) { + LOG.warn("Failed invocation for " + p.toString(), e); + } + return false; + } + + private static void checkIfCancelled(final CancelableProgressable reporter) + throws InterruptedIOException { + if (reporter == null) { + return; + } + if (!reporter.progress()) { + throw new InterruptedIOException("Operation cancelled"); + } + } +} diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java new file mode 100644 index 00000000000..9b276aca078 --- /dev/null +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSTestBase.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AsyncFSTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncFSTestBase.class); + + protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + protected static File CLUSTER_TEST_DIR; + + protected static MiniDFSCluster CLUSTER; + + private static boolean deleteOnExit() { + String v = System.getProperty("hbase.testing.preserve.testdir"); + // Let default be true, to delete on exit. + return v == null ? true : !Boolean.parseBoolean(v); + } + + /** + * Creates a directory for the cluster, under the test data + */ + protected static void setupClusterTestDir() { + // Using randomUUID ensures that multiple clusters can be launched by + // a same test, if it stops & starts them + Path testDir = + UTIL.getDataTestDir("cluster_" + HBaseCommonTestingUtility.getRandomUUID().toString()); + CLUSTER_TEST_DIR = new File(testDir.toString()).getAbsoluteFile(); + // Have it cleaned up on exit + boolean b = deleteOnExit(); + if (b) { + CLUSTER_TEST_DIR.deleteOnExit(); + } + LOG.info("Created new mini-cluster data directory: {}, deleteOnExit={}", CLUSTER_TEST_DIR, b); + } + + private static String createDirAndSetProperty(final String property) { + return createDirAndSetProperty(property, property); + } + + private static String createDirAndSetProperty(final String relPath, String property) { + String path = UTIL.getDataTestDir(relPath).toString(); + System.setProperty(property, path); + UTIL.getConfiguration().set(property, path); + new File(path).mkdirs(); + LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); + return path; + } + + private static void createDirsAndSetProperties() throws IOException { + setupClusterTestDir(); + System.setProperty("test.build.data", CLUSTER_TEST_DIR.getPath()); + createDirAndSetProperty("test.cache.data"); + createDirAndSetProperty("hadoop.tmp.dir"); + + // Frustrate yarn's and hdfs's attempts at writing /tmp. + // Below is fragile. Make it so we just interpolate any 'tmp' reference. + createDirAndSetProperty("dfs.journalnode.edits.dir"); + createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); + createDirAndSetProperty("nfs.dump.dir"); + createDirAndSetProperty("java.io.tmpdir"); + createDirAndSetProperty("dfs.journalnode.edits.dir"); + createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); + createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); + } + + protected static void startMiniDFSCluster(int servers) throws IOException { + if (CLUSTER != null) { + throw new IllegalStateException("Already started"); + } + createDirsAndSetProperties(); + + Configuration conf = UTIL.getConfiguration(); + // Error level to skip some warnings specific to the minicluster. See HBASE-4709 + org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class) + .setLevel(org.apache.log4j.Level.ERROR); + org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class) + .setLevel(org.apache.log4j.Level.ERROR); + + TraceUtil.initTracer(conf); + CLUSTER = new MiniDFSCluster.Builder(conf).numDataNodes(servers).build(); + CLUSTER.waitClusterUp(); + } + + protected static void shutdownMiniDFSCluster() { + if (CLUSTER != null) { + CLUSTER.shutdown(true); + CLUSTER = null; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java similarity index 90% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java rename to hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index d23e8161342..03ff1ee7753 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -63,16 +62,14 @@ import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; @Category({ MiscTests.class, MediumTests.class }) -public class TestFanOutOneBlockAsyncDFSOutput { +public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class); + HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class); private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static DistributedFileSystem FS; private static EventLoopGroup EVENT_LOOP_GROUP; @@ -86,9 +83,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { @BeforeClass public static void setUp() throws Exception { - TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); - TEST_UTIL.startMiniDFSCluster(3); - FS = TEST_UTIL.getDFSCluster().getFileSystem(); + UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + startMiniDFSCluster(3); + FS = CLUSTER.getFileSystem(); EVENT_LOOP_GROUP = new NioEventLoopGroup(); CHANNEL_CLASS = NioSocketChannel.class; } @@ -98,11 +95,11 @@ public class TestFanOutOneBlockAsyncDFSOutput { if (EVENT_LOOP_GROUP != null) { EVENT_LOOP_GROUP.shutdownGracefully().sync(); } - TEST_UTIL.shutdownMiniDFSCluster(); + shutdownMiniDFSCluster(); } static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) - throws IOException, InterruptedException, ExecutionException { + throws IOException, InterruptedException, ExecutionException { List> futures = new ArrayList<>(); byte[] b = new byte[10]; Random rand = new Random(12345); @@ -151,7 +148,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { out.write(b, 0, b.length); out.flush(false).get(); // restart one datanode which causes one connection broken - TEST_UTIL.getDFSCluster().restartDataNode(0); + CLUSTER.restartDataNode(0); out.write(b, 0, b.length); try { out.flush(false).get(); @@ -199,8 +196,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { @Test public void testConnectToDatanodeFailed() - throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, - InvocationTargetException, InterruptedException, NoSuchFieldException { + throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, + InvocationTargetException, InterruptedException, NoSuchFieldException { Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); xceiverServerDaemonField.setAccessible(true); Class xceiverServerClass = @@ -208,7 +205,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); numPeersMethod.setAccessible(true); // make one datanode broken - DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0); + DataNodeProperties dnProp = CLUSTER.stopDataNode(0); Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, @@ -216,7 +213,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { - TEST_UTIL.getDFSCluster().restartDataNode(dnProp); + CLUSTER.restartDataNode(dnProp); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java similarity index 94% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java rename to hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index c49e02df6dc..d2fdf172cbe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -47,7 +47,7 @@ public class TestLocalAsyncOutput { private static Class CHANNEL_CLASS = NioSocketChannel.class; - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility(); @AfterClass public static void tearDownAfterClass() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java similarity index 91% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java rename to hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java index aaeed1523f7..592598c8bb4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestOverwriteFileUnderConstruction.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.io.asyncfs; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import java.io.FileNotFoundException; @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; @@ -47,14 +46,12 @@ import org.junit.rules.TestName; * Used to confirm that it is OK to overwrite a file which is being written currently. */ @Category({ MiscTests.class, MediumTests.class }) -public class TestOverwriteFileUnderConstruction { +public class TestOverwriteFileUnderConstruction extends AsyncFSTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestOverwriteFileUnderConstruction.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static FileSystem FS; @Rule @@ -62,13 +59,13 @@ public class TestOverwriteFileUnderConstruction { @BeforeClass public static void setUp() throws Exception { - UTIL.startMiniDFSCluster(3); - FS = UTIL.getDFSCluster().getFileSystem(); + startMiniDFSCluster(3); + FS = CLUSTER.getFileSystem(); } @AfterClass public static void tearDown() throws Exception { - UTIL.shutdownMiniCluster(); + shutdownMiniDFSCluster(); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java similarity index 69% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java rename to hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index cf0ffa235d1..2de7f412b7d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -25,11 +25,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY import java.io.File; import java.io.IOException; import java.lang.reflect.Method; +import java.net.BindException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Properties; import java.util.concurrent.ExecutionException; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CipherSuite; @@ -37,8 +40,8 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; +import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -57,6 +60,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; @@ -66,13 +71,14 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; @RunWith(Parameterized.class) @Category({ MiscTests.class, LargeTests.class }) -public class TestSaslFanOutOneBlockAsyncDFSOutput { +public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class); - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class); private static DistributedFileSystem FS; @@ -82,8 +88,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { private static int READ_TIMEOUT_MS = 200000; - private static final File KEYTAB_FILE = - new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); private static MiniKdc KDC; @@ -124,7 +129,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { private static void setUpKeyProvider(Configuration conf) throws Exception { URI keyProviderUri = - new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString()); + new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString()); conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); @@ -132,21 +137,56 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { keyProvider.close(); } + /** + * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given + * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. + */ + private static MiniKdc setupMiniKdc(File keytabFile) throws Exception { + Properties conf = MiniKdc.createConf(); + conf.put(MiniKdc.DEBUG, true); + MiniKdc kdc = null; + File dir = null; + // There is time lag between selecting a port and trying to bind with it. It's possible that + // another service captures the port in between which'll result in BindException. + boolean bindException; + int numTries = 0; + do { + try { + bindException = false; + dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath()); + kdc = new MiniKdc(conf, dir); + kdc.start(); + } catch (BindException e) { + FileUtils.deleteDirectory(dir); // clean directory + numTries++; + if (numTries == 3) { + LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); + throw e; + } + LOG.error("BindException encountered when setting up MiniKdc. Trying again."); + bindException = true; + } + } while (bindException); + System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, + keytabFile.getAbsolutePath()); + return kdc; + } + @BeforeClass public static void setUpBeforeClass() throws Exception { EVENT_LOOP_GROUP = new NioEventLoopGroup(); CHANNEL_CLASS = NioSocketChannel.class; - TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); - KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); + UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + KDC = setupMiniKdc(KEYTAB_FILE); USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); PRINCIPAL = USERNAME + "/" + HOST; HTTP_PRINCIPAL = "HTTP/" + HOST; KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); - setUpKeyProvider(TEST_UTIL.getConfiguration()); - HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration(), - PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); - HBaseKerberosUtils.setSSLConfiguration(TEST_UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); + setUpKeyProvider(UTIL.getConfiguration()); + HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(), + PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); + HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); } @AfterClass @@ -157,6 +197,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { if (KDC != null) { KDC.stop(); } + shutdownMiniDFSCluster(); } private Path testDirOnTestFs; @@ -171,25 +212,25 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { @Before public void setUp() throws Exception { - TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); + UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) { - TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); + UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); } else { - TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); + UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); } if (StringUtils.isBlank(encryptionAlgorithm)) { - TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); + UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); } else { - TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); + UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); } if (StringUtils.isBlank(cipherSuite)) { - TEST_UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); + UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); } else { - TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); + UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); } - TEST_UTIL.startMiniDFSCluster(3); - FS = TEST_UTIL.getDFSCluster().getFileSystem(); + startMiniDFSCluster(3); + FS = CLUSTER.getFileSystem(); testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); FS.mkdirs(testDirOnTestFs); entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc"); @@ -199,7 +240,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { @After public void tearDown() throws IOException { - TEST_UTIL.shutdownMiniDFSCluster(); + shutdownMiniDFSCluster(); } private Path getTestFile() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java similarity index 100% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java rename to hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java similarity index 87% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java rename to hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java index f80e5f48919..5d0b2ebfff3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java @@ -20,24 +20,24 @@ package org.apache.hadoop.hbase.security; import java.io.File; import java.io.IOException; import java.net.InetAddress; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Strings; -import org.apache.hadoop.security.UserGroupInformation; @InterfaceAudience.Private -public class HBaseKerberosUtils { +public final class HBaseKerberosUtils { private static final Logger LOG = LoggerFactory.getLogger(HBaseKerberosUtils.class); public static final String KRB_PRINCIPAL = SecurityConstants.REGIONSERVER_KRB_PRINCIPAL; @@ -46,6 +46,9 @@ public class HBaseKerberosUtils { public static final String CLIENT_PRINCIPAL = AuthUtil.HBASE_CLIENT_KERBEROS_PRINCIPAL; public static final String CLIENT_KEYTAB = AuthUtil.HBASE_CLIENT_KEYTAB_FILE; + private HBaseKerberosUtils() { + } + public static boolean isKerberosPropertySetted() { String krbPrincipal = System.getProperty(KRB_PRINCIPAL); String krbKeytab = System.getProperty(KRB_KEYTAB_FILE); @@ -111,8 +114,8 @@ public class HBaseKerberosUtils { * @param servicePrincipal service principal used by NN, HM and RS. * @param spnegoPrincipal SPNEGO principal used by NN web UI. */ - public static void setSecuredConfiguration(Configuration conf, - String servicePrincipal, String spnegoPrincipal) { + public static void setSecuredConfiguration(Configuration conf, String servicePrincipal, + String spnegoPrincipal) { setPrincipalForTesting(servicePrincipal); setSecuredConfiguration(conf); setSecuredHadoopConfiguration(conf, spnegoPrincipal); @@ -128,17 +131,13 @@ public class HBaseKerberosUtils { } private static void setSecuredHadoopConfiguration(Configuration conf, - String spnegoServerPrincipal) { - // if we drop support for hadoop-2.4.0 and hadoop-2.4.1, - // the following key should be changed. - // 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY - // 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY + String spnegoServerPrincipal) { String serverPrincipal = System.getProperty(KRB_PRINCIPAL); String keytabFilePath = System.getProperty(KRB_KEYTAB_FILE); // HDFS - conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, serverPrincipal); + conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, serverPrincipal); conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, keytabFilePath); - conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, serverPrincipal); + conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, serverPrincipal); conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, keytabFilePath); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); // YARN @@ -146,8 +145,7 @@ public class HBaseKerberosUtils { conf.set(YarnConfiguration.NM_PRINCIPAL, KRB_PRINCIPAL); if (spnegoServerPrincipal != null) { - conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, - spnegoServerPrincipal); + conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoServerPrincipal); } conf.setBoolean("ignore.secure.ports.for.testing", true); @@ -161,8 +159,8 @@ public class HBaseKerberosUtils { * @param clazz the caller test class. * @throws Exception if unable to set up SSL configuration */ - public static void setSSLConfiguration(HBaseTestingUtility utility, Class clazz) - throws Exception { + public static void setSSLConfiguration(HBaseCommonTestingUtility utility, Class clazz) + throws Exception { Configuration conf = utility.getConfiguration(); conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); @@ -175,19 +173,19 @@ public class HBaseKerberosUtils { } public static UserGroupInformation loginAndReturnUGI(Configuration conf, String username) - throws IOException { + throws IOException { String hostname = InetAddress.getLocalHost().getHostName(); String keyTabFileConfKey = "hbase." + username + ".keytab.file"; String keyTabFileLocation = conf.get(keyTabFileConfKey); String principalConfKey = "hbase." + username + ".kerberos.principal"; String principal = org.apache.hadoop.security.SecurityUtil - .getServerPrincipal(conf.get(principalConfKey), hostname); + .getServerPrincipal(conf.get(principalConfKey), hostname); if (keyTabFileLocation == null || principal == null) { - LOG.warn("Principal or key tab file null for : " + principalConfKey + ", " - + keyTabFileConfKey); + LOG.warn( + "Principal or key tab file null for : " + principalConfKey + ", " + keyTabFileConfKey); } UserGroupInformation ugi = - UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation); + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation); return ugi; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java similarity index 56% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java rename to hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java index a645f949f64..3931dfd5ba2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java @@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.util; import static org.junit.Assert.assertTrue; import java.io.IOException; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -33,21 +32,18 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Test our recoverLease loop against mocked up filesystem. */ @Category({ MiscTests.class, MediumTests.class }) -public class TestFSHDFSUtils { +public class TestRecoverLeaseFSUtils { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestFSHDFSUtils.class); + HBaseClassTestRule.forClass(TestRecoverLeaseFSUtils.class); - private static final Logger LOG = LoggerFactory.getLogger(TestFSHDFSUtils.class); - private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static final HBaseCommonTestingUtility HTU = new HBaseCommonTestingUtility(); static { Configuration conf = HTU.getConfiguration(); conf.setInt("hbase.lease.recovery.first.pause", 10); @@ -67,14 +63,14 @@ public class TestFSHDFSUtils { Mockito.when(reporter.progress()).thenReturn(true); DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class); // Fail four times and pass on the fifth. - Mockito.when(dfs.recoverLease(FILE)). - thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(true); - FSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter); + Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false) + .thenReturn(false).thenReturn(true); + RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter); Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE); // Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the first two // invocations will happen pretty fast... the we fall into the longer wait loop). - assertTrue((EnvironmentEdgeManager.currentTime() - startTime) > - (3 * HTU.getConfiguration().getInt("hbase.lease.recovery.dfs.timeout", 61000))); + assertTrue((EnvironmentEdgeManager.currentTime() - startTime) > (3 * + HTU.getConfiguration().getInt("hbase.lease.recovery.dfs.timeout", 61000))); } /** @@ -90,66 +86,13 @@ public class TestFSHDFSUtils { // Now make it so we fail the first two times -- the two fast invocations, then we fall into // the long loop during which we will call isFileClosed.... the next invocation should // therefore return true if we are to break the loop. - Mockito.when(dfs.recoverLease(FILE)). - thenReturn(false).thenReturn(false).thenReturn(true); + Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true); Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true); - FSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter); + RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter); Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE); Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE); } - private void testIsSameHdfs(int nnport) throws IOException { - Configuration conf = HBaseConfiguration.create(); - Path srcPath = new Path("hdfs://localhost:" + nnport + "/"); - Path desPath = new Path("hdfs://127.0.0.1/"); - FileSystem srcFs = srcPath.getFileSystem(conf); - FileSystem desFs = desPath.getFileSystem(conf); - - assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); - - desPath = new Path("hdfs://127.0.0.1:8070/"); - desFs = desPath.getFileSystem(conf); - assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); - - desPath = new Path("hdfs://127.0.1.1:" + nnport + "/"); - desFs = desPath.getFileSystem(conf); - assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); - - conf.set("fs.defaultFS", "hdfs://haosong-hadoop"); - conf.set("dfs.nameservices", "haosong-hadoop"); - conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2"); - conf.set("dfs.client.failover.proxy.provider.haosong-hadoop", - "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - - conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:"+ nnport); - conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000"); - desPath = new Path("/"); - desFs = desPath.getFileSystem(conf); - assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); - - conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:"+nnport); - conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000"); - desPath = new Path("/"); - desFs = desPath.getFileSystem(conf); - assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); - } - - @Test - public void testIsSameHdfs() throws IOException { - String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion(); - LOG.info("hadoop version is: " + hadoopVersion); - boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0"); - if (isHadoop3_0_0) { - // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820. - // See HDFS-9427 - testIsSameHdfs(9820); - } else { - // pre hadoop 3.0.0 defaults to port 8020 - // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990 - testIsSameHdfs(8020); - } - } - /** * Version of DFS that has HDFS-4525 in it. */ diff --git a/hbase-asyncfs/src/test/resources/hbase-site.xml b/hbase-asyncfs/src/test/resources/hbase-site.xml new file mode 100644 index 00000000000..c938f285234 --- /dev/null +++ b/hbase-asyncfs/src/test/resources/hbase-site.xml @@ -0,0 +1,179 @@ + + + + + + hbase.regionserver.msginterval + 100 + Interval between messages from the RegionServer to HMaster + in milliseconds. Default is 15. Set this value low if you want unit + tests to be responsive. + + + + hbase.server.thread.wakefrequency + 1000 + 100 + Time to sleep in between searches for work (in milliseconds). + Used as sleep interval by service threads such as hbase:meta scanner and log roller. + + + + hbase.defaults.for.version.skip + true + + + hbase.procedure.store.wal.use.hsync + false + + + hbase.procedure.check.owner.set + false + Whether ProcedureExecutor should enforce that each + procedure to have an owner + + + + hbase.unsafe.stream.capability.enforce + false + + Controls whether HBase will check for stream capabilities (hflush/hsync). + Disable this if you intend to run on LocalFileSystem. + WARNING: Doing so may expose you to additional risk of data loss! + + + + hbase.regionserver.handler.count + 3 + Default is 30 + + + hbase.regionserver.metahandler.count + 3 + Default is 20 + + + hbase.netty.worker.count + 3 + Default is 0 + + + hbase.hconnection.threads.max + 6 + Default is 256 + + + hbase.htable.threads.max + 3 + Default is MAX_INTEGER + + + hbase.region.replica.replication.threads.max + 7 + Default is 256 + + + hbase.rest.threads.max + 5 + Default is 100 + + + hbase.replication.bulkload.copy.maxthreads + 3 + Default is 10 + + + hbase.loadincremental.threads.max + 1 + Default is # of CPUs + + + hbase.hstore.flusher.count + 1 + Default is 2 + + + hbase.oldwals.cleaner.thread.size + 1 + Default is 2 + + + hbase.master.procedure.threads + 5 + Default is at least 16 + + + hbase.procedure.remote.dispatcher.threadpool.size + 3 + Default is 128 + + + hbase.regionserver.executor.closeregion.threads + 1 + Default is 3 + + + hbase.regionserver.executor.openregion.threads + 1 + Default is 3 + + + hbase.regionserver.executor.openpriorityregion.threads + 1 + Default is 3 + + + hbase.storescanner.parallel.seek.threads + 3 + Default is 10 + + + hbase.hfile.compaction.discharger.thread.count + 1 + Default is 10 + + + hbase.regionserver.executor.refresh.peer.threads + 1 + Default is 2 + + + hbase.hregion.open.and.init.threads.max + 3 + Default is 16 or # of Regions + + + hbase.master.handler.count + 7 + Default is 25 + + + hbase.replication.source.maxthreads + + Default is 10 + + + hbase.hconnection.meta.lookup.threads.max + 5 + Default is 128 + + diff --git a/hbase-asyncfs/src/test/resources/hdfs-site.xml b/hbase-asyncfs/src/test/resources/hdfs-site.xml new file mode 100644 index 00000000000..9230105a0fd --- /dev/null +++ b/hbase-asyncfs/src/test/resources/hdfs-site.xml @@ -0,0 +1,56 @@ + + + + + + + + dfs.namenode.fs-limits.min-block-size + 0 + + + dfs.datanode.handler.count + 5 + Default is 10 + + + dfs.namenode.handler.count + 5 + Default is 10 + + + dfs.namenode.service.handler.count + 5 + Default is 10 + + + diff --git a/hbase-asyncfs/src/test/resources/log4j.properties b/hbase-asyncfs/src/test/resources/log4j.properties new file mode 100644 index 00000000000..c322699ced2 --- /dev/null +++ b/hbase-asyncfs/src/test/resources/log4j.properties @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hbase.root.logger=INFO,console +hbase.log.dir=. +hbase.log.file=hbase.log + +# Define the root logger to the system property "hbase.root.logger". +log4j.rootLogger=${hbase.root.logger} + +# Logging Threshold +log4j.threshold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG + +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.hadoop.hbase=DEBUG + +#These settings are workarounds against spurious logs from the minicluster. +#See HBASE-4709 +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN +log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN +# Enable this to get detailed connection error/retry logging. +# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml index f7d6236fb02..3636422e61c 100644 --- a/hbase-endpoint/pom.xml +++ b/hbase-endpoint/pom.xml @@ -163,6 +163,12 @@ test-jar test + + org.apache.hbase + hbase-asyncfs + test-jar + test + org.apache.hbase hbase-http diff --git a/hbase-examples/pom.xml b/hbase-examples/pom.xml index 83f398bb485..e54ec36350c 100644 --- a/hbase-examples/pom.xml +++ b/hbase-examples/pom.xml @@ -99,6 +99,16 @@ org.apache.hbase hbase-client + + org.apache.hbase + hbase-asyncfs + + + org.apache.hbase + hbase-asyncfs + test-jar + test + org.apache.hbase hbase-server diff --git a/hbase-mapreduce/pom.xml b/hbase-mapreduce/pom.xml index f3a4acd084f..1d127dd2502 100644 --- a/hbase-mapreduce/pom.xml +++ b/hbase-mapreduce/pom.xml @@ -129,6 +129,16 @@ org.apache.hbase hbase-metrics-api + + org.apache.hbase + hbase-asyncfs + + + org.apache.hbase + hbase-asyncfs + test-jar + test + io.dropwizard.metrics metrics-core diff --git a/hbase-rest/pom.xml b/hbase-rest/pom.xml index 3206c1ddb9d..a86fd8a0ee6 100644 --- a/hbase-rest/pom.xml +++ b/hbase-rest/pom.xml @@ -177,6 +177,12 @@ org.apache.hbase hbase-server + + org.apache.hbase + hbase-asyncfs + test-jar + test + org.apache.hbase hbase-hadoop-compat diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 50474a0e241..2e8d1ec80b5 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -351,6 +351,16 @@ test-jar test + + org.apache.hbase + hbase-asyncfs + + + org.apache.hbase + hbase-asyncfs + test-jar + test + org.eclipse.jetty jetty-server diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index d2b4cb20948..998358ed876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -324,16 +324,14 @@ public interface MasterServices extends Server { /** * Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint. - * - *

- * Only a single instance may be registered for a given {@link Service} subclass (the - * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}. - * After the first registration, subsequent calls with the same service name will fail with - * a return value of {@code false}. - *

+ *

+ * Only a single instance may be registered for a given {@link Service} subclass (the instances + * are keyed on + * {@link org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}. + * After the first registration, subsequent calls with the same service name will fail with a + * return value of {@code false}. * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint - * @return {@code true} if the registration was successful, {@code false} - * otherwise + * @return {@code true} if the registration was successful, {@code false} otherwise */ boolean registerService(Service instance); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 292d76e8f4a..7baa5bd928b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -57,7 +57,7 @@ public class MasterProcedureEnv implements ConfigurationObserver { @Override public void recoverFileLease(final FileSystem fs, final Path path) throws IOException { final Configuration conf = master.getConfiguration(); - FSUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() { + RecoverLeaseFSUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() { @Override public boolean progress() { LOG.debug("Recover Procedure Store log lease: " + path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index e8ae3e5687b..d2cb6f33c15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; @@ -384,7 +384,7 @@ class WALEntryStream implements Closeable { private void recoverLease(final Configuration conf, final Path path) { try { final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf); - FSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { @Override public boolean progress() { LOG.debug("recover WAL lease: " + path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index ece76fb6b9f..420fdb02ca7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -58,7 +58,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; @@ -80,7 +79,6 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.Progressable; @@ -1872,181 +1870,4 @@ public final class FSUtils { return false; } - - public static void recoverFileLease(FileSystem fs, Path p, Configuration conf) - throws IOException { - recoverFileLease(fs, p, conf, null); - } - - /** - * Recover the lease from HDFS, retrying multiple times. - */ - public static void recoverFileLease(FileSystem fs, Path p, Configuration conf, - CancelableProgressable reporter) throws IOException { - if (fs instanceof FilterFileSystem) { - fs = ((FilterFileSystem) fs).getRawFileSystem(); - } - // lease recovery not needed for local file system case. - if (!(fs instanceof DistributedFileSystem)) { - return; - } - recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter); - } - - /* - * Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the - * lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has - * succeeded or the file is closed. But, we have to be careful. Each time we call recoverLease, it - * starts the recover lease process over from the beginning. We could put ourselves in a situation - * where we are doing nothing but starting a recovery, interrupting it to start again, and so on. - * The findings over in HBASE-8354 have it that the namenode will try to recover the lease on the - * file's primary node. If all is well, it should return near immediately. But, as is common, it - * is the very primary node that has crashed and so the namenode will be stuck waiting on a socket - * timeout before it will ask another datanode to start the recovery. It does not help if we call - * recoverLease in the meantime and in particular, subsequent to the socket timeout, a - * recoverLease invocation will cause us to start over from square one (possibly waiting on socket - * timeout against primary node). So, in the below, we do the following: 1. Call recoverLease. 2. - * If it returns true, break. 3. If it returns false, wait a few seconds and then call it again. - * 4. If it returns true, break. 5. If it returns false, wait for what we think the datanode - * socket timeout is (configurable) and then try again. 6. If it returns true, break. 7. If it - * returns false, repeat starting at step 5. above. If HDFS-4525 is available, call it every - * second and we might be able to exit early. - */ - private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, - final Configuration conf, final CancelableProgressable reporter) throws IOException { - LOG.info("Recover lease on dfs file " + p); - long startWaiting = EnvironmentEdgeManager.currentTime(); - // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS - // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves - // beyond that limit 'to be safe'. - long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting; - // This setting should be a little bit above what the cluster dfs heartbeat is set to. - long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000); - // This should be set to how long it'll take for us to timeout against primary datanode if it - // is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the - // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this - // timeout, then further recovery will take liner backoff with this base, to avoid endless - // preemptions when this value is not properly configured. - long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000); - - Method isFileClosedMeth = null; - // whether we need to look for isFileClosed method - boolean findIsFileClosedMeth = true; - boolean recovered = false; - // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. - for (int nbAttempt = 0; !recovered; nbAttempt++) { - recovered = recoverLease(dfs, nbAttempt, p, startWaiting); - if (recovered) { - break; - } - checkIfCancelled(reporter); - if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) { - break; - } - try { - // On the first time through wait the short 'firstPause'. - if (nbAttempt == 0) { - Thread.sleep(firstPause); - } else { - // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check - // isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though. - long localStartWaiting = EnvironmentEdgeManager.currentTime(); - while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) < subsequentPauseBase * - nbAttempt) { - Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000)); - if (findIsFileClosedMeth) { - try { - isFileClosedMeth = - dfs.getClass().getMethod("isFileClosed", new Class[] { Path.class }); - } catch (NoSuchMethodException nsme) { - LOG.debug("isFileClosed not available"); - } finally { - findIsFileClosedMeth = false; - } - } - if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { - recovered = true; - break; - } - checkIfCancelled(reporter); - } - } - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; - } - } - return recovered; - } - - private static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout, - final int nbAttempt, final Path p, final long startWaiting) { - if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) { - LOG.warn("Cannot recoverLease after trying for " + - conf.getInt("hbase.lease.recovery.timeout", 900000) + - "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " + - getLogMessageDetail(nbAttempt, p, startWaiting)); - return true; - } - return false; - } - - /** - * Try to recover the lease. - * @return True if dfs#recoverLease came by true. - */ - private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, - final Path p, final long startWaiting) throws FileNotFoundException { - boolean recovered = false; - try { - recovered = dfs.recoverLease(p); - LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ") + - getLogMessageDetail(nbAttempt, p, startWaiting)); - } catch (IOException e) { - if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { - // This exception comes out instead of FNFE, fix it - throw new FileNotFoundException("The given WAL wasn't found at " + p); - } else if (e instanceof FileNotFoundException) { - throw (FileNotFoundException) e; - } - LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); - } - return recovered; - } - - /** - * @return Detail to append to any log message around lease recovering. - */ - private static String getLogMessageDetail(final int nbAttempt, final Path p, - final long startWaiting) { - return "attempt=" + nbAttempt + " on file=" + p + " after " + - (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms"; - } - - /** - * Call HDFS-4525 isFileClosed if it is available. - * @return True if file is closed. - */ - private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m, - final Path p) { - try { - return (Boolean) m.invoke(dfs, p); - } catch (SecurityException e) { - LOG.warn("No access", e); - } catch (Exception e) { - LOG.warn("Failed invocation for " + p.toString(), e); - } - return false; - } - - private static void checkIfCancelled(final CancelableProgressable reporter) - throws InterruptedIOException { - if (reporter == null) { - return; - } - if (!reporter.progress()) { - throw new InterruptedIOException("Operation cancelled"); - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index da84c15f23a..f046cd043da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -845,7 +845,7 @@ public class RegionSplitter { fs.rename(tmpFile, splitFile); } else { LOG.debug("_balancedSplit file found. Replay log to restore state..."); - FSUtils.recoverFileLease(fs, splitFile, connection.getConfiguration(), null); + RecoverLeaseFSUtils.recoverFileLease(fs, splitFile, connection.getConfiguration(), null); // parse split file and process remaining splits FSDataInputStream tmpIn = fs.open(splitFile); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index c85a688ad4c..ce6770fd0f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -515,7 +515,7 @@ public abstract class AbstractFSWALProvider> implemen private static void recoverLease(final Configuration conf, final Path path) { try { final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf); - FSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { @Override public boolean progress() { LOG.debug("Still trying to recover WAL lease: " + path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index a56cdb84d3e..010735dcdde 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -54,7 +54,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; @@ -393,7 +393,7 @@ public class WALSplitter { } try { - FSUtils.recoverFileLease(walFS, path, conf, reporter); + RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, reporter); try { in = getReader(path, reporter); } catch (EOFException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 6b36ff19df9..c231cd74245 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -698,14 +698,18 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { return dfsCluster; } - /** This is used before starting HDFS and map-reduce mini-clusters - * Run something like the below to check for the likes of '/tmp' references -- i.e. - * references outside of the test data dir -- in the conf. - * Configuration conf = TEST_UTIL.getConfiguration(); - * for (Iterator> i = conf.iterator(); i.hasNext();) { - * Map.Entry e = i.next(); - * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); - * } + /** + * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to + * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in + * the conf. + * + *

+   * Configuration conf = TEST_UTIL.getConfiguration();
+   * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) {
+   *   Map.Entry<String, String> e = i.next();
+   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
+   * }
+   * 
*/ private void createDirsAndSetProperties() throws IOException { setupClusterTestDir(); @@ -741,7 +745,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); createDirAndSetProperty("nfs.dump.dir"); createDirAndSetProperty("java.io.tmpdir"); - createDirAndSetProperty("java.io.tmpdir"); createDirAndSetProperty("dfs.journalnode.edits.dir"); createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index fb07d2a64d9..691250a5609 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -497,8 +497,8 @@ public class TestLogRolling extends AbstractTestLogRolling { Set loggedRows = new HashSet<>(); for (Path p : paths) { LOG.debug("recovering lease for " + p); - FSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), - null); + RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, + TEST_UTIL.getConfiguration(), null); LOG.debug("Reading WAL " + CommonFSUtils.getPath(p)); WAL.Reader reader = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 63c8f86dc9c..24ad6efdd54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Random; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -37,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; @@ -645,17 +647,12 @@ public class TestFSUtils { } - private static final boolean STREAM_CAPABILITIES_IS_PRESENT; static { - boolean tmp = false; try { Class.forName("org.apache.hadoop.fs.StreamCapabilities"); - tmp = true; LOG.debug("Test thought StreamCapabilities class was present."); } catch (ClassNotFoundException exception) { LOG.debug("Test didn't think StreamCapabilities class was present."); - } finally { - STREAM_CAPABILITIES_IS_PRESENT = tmp; } } @@ -672,4 +669,56 @@ public class TestFSUtils { cluster.shutdown(); } } + + private void testIsSameHdfs(int nnport) throws IOException { + Configuration conf = HBaseConfiguration.create(); + Path srcPath = new Path("hdfs://localhost:" + nnport + "/"); + Path desPath = new Path("hdfs://127.0.0.1/"); + FileSystem srcFs = srcPath.getFileSystem(conf); + FileSystem desFs = desPath.getFileSystem(conf); + + assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); + + desPath = new Path("hdfs://127.0.0.1:8070/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); + + desPath = new Path("hdfs://127.0.1.1:" + nnport + "/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); + + conf.set("fs.defaultFS", "hdfs://haosong-hadoop"); + conf.set("dfs.nameservices", "haosong-hadoop"); + conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2"); + conf.set("dfs.client.failover.proxy.provider.haosong-hadoop", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport); + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000"); + desPath = new Path("/"); + desFs = desPath.getFileSystem(conf); + assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs)); + + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport); + conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000"); + desPath = new Path("/"); + desFs = desPath.getFileSystem(conf); + assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs)); + } + + @Test + public void testIsSameHdfs() throws IOException { + String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion(); + LOG.info("hadoop version is: " + hadoopVersion); + boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0"); + if (isHadoop3_0_0) { + // Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820. + // See HDFS-9427 + testIsSameHdfs(9820); + } else { + // pre hadoop 3.0.0 defaults to port 8020 + // Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990 + testIsSameHdfs(8020); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 4dcff08a25f..b276283a91f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WALFactory.Providers; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -455,7 +455,7 @@ public class TestWALFactory { @Override public void run() { try { - FSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null); + RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null); } catch (IOException e) { exception = e; } diff --git a/hbase-shaded/hbase-shaded-testing-util/pom.xml b/hbase-shaded/hbase-shaded-testing-util/pom.xml index a98b191abc4..0da5fd394b3 100644 --- a/hbase-shaded/hbase-shaded-testing-util/pom.xml +++ b/hbase-shaded/hbase-shaded-testing-util/pom.xml @@ -135,6 +135,12 @@ + + org.apache.hbase + hbase-asyncfs + test-jar + compile + org.apache.hbase hbase-zookeeper diff --git a/hbase-testing-util/pom.xml b/hbase-testing-util/pom.xml index 4106d912453..d3eeb8c60fe 100644 --- a/hbase-testing-util/pom.xml +++ b/hbase-testing-util/pom.xml @@ -91,6 +91,12 @@ test-jar compile + + org.apache.hbase + hbase-asyncfs + test-jar + compile + org.apache.hbase hbase-hadoop-compat diff --git a/hbase-thrift/pom.xml b/hbase-thrift/pom.xml index 520acc42ca6..ae03d7d1b3e 100644 --- a/hbase-thrift/pom.xml +++ b/hbase-thrift/pom.xml @@ -182,6 +182,12 @@ org.apache.hbase hbase-server + + org.apache.hbase + hbase-asyncfs + test-jar + test + org.apache.hbase hbase-testing-util diff --git a/pom.xml b/pom.xml index 2a220a76cd2..bb7087c685f 100755 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,7 @@ hbase-metrics hbase-zookeeper hbase-hbtop + hbase-asyncfs scm:git:git://gitbox.apache.org/repos/asf/hbase.git @@ -1401,6 +1402,7 @@ hbase-rsgroup-${project.version}-tests.jar hbase-mapreduce-${project.version}-tests.jar hbase-zookeeper-${project.version}-tests.jar + hbase-asyncfs-${project.version}-tests.jar bash surefire-junit47 @@ -1726,6 +1728,18 @@ hbase-shaded-mapreduce ${project.version} + + org.apache.hbase + hbase-asyncfs + ${project.version} + + + org.apache.hbase + hbase-asyncfs + ${project.version} + test-jar + test + com.github.stephenc.findbugs