HBASE-24304 Separate a hbase-asyncfs module (#1628)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2020-05-06 14:40:21 +08:00 committed by GitHub
parent 045c909bdf
commit a9a1b9524d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 1137 additions and 366 deletions

View File

@ -119,6 +119,14 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/../hbase-asyncfs/target/</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>${asyncfs.test.jar}</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/../hbase-zookeeper/target/</directory>
<outputDirectory>lib</outputDirectory>

View File

@ -152,6 +152,14 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/../hbase-asyncfs/target/</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>${asyncfs.test.jar}</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/../hbase-zookeeper/target/</directory>
<outputDirectory>lib</outputDirectory>

View File

@ -34,7 +34,9 @@
<includes>
<!-- Keep this list sorted by name -->
<include>org.apache.hbase:hbase-annotations</include>
<include>org.apache.hbase:hbase-asyncfs</include>
<include>org.apache.hbase:hbase-backup</include>
<include>org.apache.hbase:hbase-balancer</include>
<include>org.apache.hbase:hbase-client</include>
<include>org.apache.hbase:hbase-common</include>
<include>org.apache.hbase:hbase-endpoint</include>
@ -49,7 +51,6 @@
<include>org.apache.hbase:hbase-procedure</include>
<include>org.apache.hbase:hbase-protocol</include>
<include>org.apache.hbase:hbase-protocol-shaded</include>
<include>org.apache.hbase:hbase-balancer</include>
<include>org.apache.hbase:hbase-replication</include>
<include>org.apache.hbase:hbase-rest</include>
<include>org.apache.hbase:hbase-rsgroup</include>

201
hbase-asyncfs/pom.xml Normal file
View File

@ -0,0 +1,201 @@
<?xml version="1.0"?>
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase-build-configuration</artifactId>
<groupId>org.apache.hbase</groupId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../hbase-build-configuration</relativePath>
</parent>
<artifactId>hbase-asyncfs</artifactId>
<name>Apache HBase - Asynchronous FileSystem</name>
<description>HBase Asynchronous FileSystem Implementation for WAL</description>
<build>
<plugins>
<!-- Make a jar and put the sources in the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<!--Make it so assembly:single does nothing in here-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<skipAssembly>true</skipAssembly>
</configuration>
</plugin>
<plugin>
<groupId>net.revelc.code</groupId>
<artifactId>warbucks-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<failOnViolation>true</failOnViolation>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>bouncycastle</groupId>
<artifactId>bcprov-jdk15</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-simplekdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-http</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<!-- Profiles for building against different hadoop versions -->
<profile>
<id>hadoop-3.0</id>
<activation>
<property><name>!hadoop.profile</name></property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>eclipse-specific</id>
<activation>
<property>
<name>m2e.version</name>
</property>
</activation>
<build>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</profile>
</profiles>
</project>

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -553,7 +553,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);
FSUtils.recoverFileLease(dfs, new Path(src), conf,
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}

View File

@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Similar interface as {@link org.apache.hadoop.util.Progressable} but returns
* a boolean to support canceling the operation.
* <p>
* <p/>
* Used for doing updating of OPENING znode during log replay on region open.
*/
@InterfaceAudience.Private

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility methods for recovering file lease for hdfs.
*/
@InterfaceAudience.Private
public final class RecoverLeaseFSUtils {
private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class);
private RecoverLeaseFSUtils() {
}
public static void recoverFileLease(FileSystem fs, Path p, Configuration conf)
throws IOException {
recoverFileLease(fs, p, conf, null);
}
/**
* Recover the lease from HDFS, retrying multiple times.
*/
public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
CancelableProgressable reporter) throws IOException {
if (fs instanceof FilterFileSystem) {
fs = ((FilterFileSystem) fs).getRawFileSystem();
}
// lease recovery not needed for local file system case.
if (!(fs instanceof DistributedFileSystem)) {
return;
}
recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
}
/*
* Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the
* lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has
* succeeded or the file is closed. But, we have to be careful. Each time we call recoverLease, it
* starts the recover lease process over from the beginning. We could put ourselves in a situation
* where we are doing nothing but starting a recovery, interrupting it to start again, and so on.
* The findings over in HBASE-8354 have it that the namenode will try to recover the lease on the
* file's primary node. If all is well, it should return near immediately. But, as is common, it
* is the very primary node that has crashed and so the namenode will be stuck waiting on a socket
* timeout before it will ask another datanode to start the recovery. It does not help if we call
* recoverLease in the meantime and in particular, subsequent to the socket timeout, a
* recoverLease invocation will cause us to start over from square one (possibly waiting on socket
* timeout against primary node). So, in the below, we do the following: 1. Call recoverLease. 2.
* If it returns true, break. 3. If it returns false, wait a few seconds and then call it again.
* 4. If it returns true, break. 5. If it returns false, wait for what we think the datanode
* socket timeout is (configurable) and then try again. 6. If it returns true, break. 7. If it
* returns false, repeat starting at step 5. above. If HDFS-4525 is available, call it every
* second and we might be able to exit early.
*/
private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
final Configuration conf, final CancelableProgressable reporter) throws IOException {
LOG.info("Recover lease on dfs file " + p);
long startWaiting = EnvironmentEdgeManager.currentTime();
// Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
// usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
// beyond that limit 'to be safe'.
long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
// This setting should be a little bit above what the cluster dfs heartbeat is set to.
long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000);
// This should be set to how long it'll take for us to timeout against primary datanode if it
// is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the
// default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this
// timeout, then further recovery will take liner backoff with this base, to avoid endless
// preemptions when this value is not properly configured.
long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000);
Method isFileClosedMeth = null;
// whether we need to look for isFileClosed method
boolean findIsFileClosedMeth = true;
boolean recovered = false;
// We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
for (int nbAttempt = 0; !recovered; nbAttempt++) {
recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
if (recovered) {
break;
}
checkIfCancelled(reporter);
if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
break;
}
try {
// On the first time through wait the short 'firstPause'.
if (nbAttempt == 0) {
Thread.sleep(firstPause);
} else {
// Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check
// isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though.
long localStartWaiting = EnvironmentEdgeManager.currentTime();
while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) < subsequentPauseBase *
nbAttempt) {
Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
if (findIsFileClosedMeth) {
try {
isFileClosedMeth =
dfs.getClass().getMethod("isFileClosed", new Class[] { Path.class });
} catch (NoSuchMethodException nsme) {
LOG.debug("isFileClosed not available");
} finally {
findIsFileClosedMeth = false;
}
}
if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
recovered = true;
break;
}
checkIfCancelled(reporter);
}
}
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
return recovered;
}
private static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
final int nbAttempt, final Path p, final long startWaiting) {
if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.warn("Cannot recoverLease after trying for " +
conf.getInt("hbase.lease.recovery.timeout", 900000) +
"ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
getLogMessageDetail(nbAttempt, p, startWaiting));
return true;
}
return false;
}
/**
* Try to recover the lease.
* @return True if dfs#recoverLease came by true.
*/
private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt,
final Path p, final long startWaiting) throws FileNotFoundException {
boolean recovered = false;
try {
recovered = dfs.recoverLease(p);
LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ") +
getLogMessageDetail(nbAttempt, p, startWaiting));
} catch (IOException e) {
if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
// This exception comes out instead of FNFE, fix it
throw new FileNotFoundException("The given WAL wasn't found at " + p);
} else if (e instanceof FileNotFoundException) {
throw (FileNotFoundException) e;
}
LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
}
return recovered;
}
/**
* @return Detail to append to any log message around lease recovering.
*/
private static String getLogMessageDetail(final int nbAttempt, final Path p,
final long startWaiting) {
return "attempt=" + nbAttempt + " on file=" + p + " after " +
(EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
}
/**
* Call HDFS-4525 isFileClosed if it is available.
* @return True if file is closed.
*/
private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m,
final Path p) {
try {
return (Boolean) m.invoke(dfs, p);
} catch (SecurityException e) {
LOG.warn("No access", e);
} catch (Exception e) {
LOG.warn("Failed invocation for " + p.toString(), e);
}
return false;
}
private static void checkIfCancelled(final CancelableProgressable reporter)
throws InterruptedIOException {
if (reporter == null) {
return;
}
if (!reporter.progress()) {
throw new InterruptedIOException("Operation cancelled");
}
}
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AsyncFSTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSTestBase.class);
protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
protected static File CLUSTER_TEST_DIR;
protected static MiniDFSCluster CLUSTER;
private static boolean deleteOnExit() {
String v = System.getProperty("hbase.testing.preserve.testdir");
// Let default be true, to delete on exit.
return v == null ? true : !Boolean.parseBoolean(v);
}
/**
* Creates a directory for the cluster, under the test data
*/
protected static void setupClusterTestDir() {
// Using randomUUID ensures that multiple clusters can be launched by
// a same test, if it stops & starts them
Path testDir =
UTIL.getDataTestDir("cluster_" + HBaseCommonTestingUtility.getRandomUUID().toString());
CLUSTER_TEST_DIR = new File(testDir.toString()).getAbsoluteFile();
// Have it cleaned up on exit
boolean b = deleteOnExit();
if (b) {
CLUSTER_TEST_DIR.deleteOnExit();
}
LOG.info("Created new mini-cluster data directory: {}, deleteOnExit={}", CLUSTER_TEST_DIR, b);
}
private static String createDirAndSetProperty(final String property) {
return createDirAndSetProperty(property, property);
}
private static String createDirAndSetProperty(final String relPath, String property) {
String path = UTIL.getDataTestDir(relPath).toString();
System.setProperty(property, path);
UTIL.getConfiguration().set(property, path);
new File(path).mkdirs();
LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
return path;
}
private static void createDirsAndSetProperties() throws IOException {
setupClusterTestDir();
System.setProperty("test.build.data", CLUSTER_TEST_DIR.getPath());
createDirAndSetProperty("test.cache.data");
createDirAndSetProperty("hadoop.tmp.dir");
// Frustrate yarn's and hdfs's attempts at writing /tmp.
// Below is fragile. Make it so we just interpolate any 'tmp' reference.
createDirAndSetProperty("dfs.journalnode.edits.dir");
createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
createDirAndSetProperty("nfs.dump.dir");
createDirAndSetProperty("java.io.tmpdir");
createDirAndSetProperty("dfs.journalnode.edits.dir");
createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
}
protected static void startMiniDFSCluster(int servers) throws IOException {
if (CLUSTER != null) {
throw new IllegalStateException("Already started");
}
createDirsAndSetProperties();
Configuration conf = UTIL.getConfiguration();
// Error level to skip some warnings specific to the minicluster. See HBASE-4709
org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class)
.setLevel(org.apache.log4j.Level.ERROR);
org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class)
.setLevel(org.apache.log4j.Level.ERROR);
TraceUtil.initTracer(conf);
CLUSTER = new MiniDFSCluster.Builder(conf).numDataNodes(servers).build();
CLUSTER.waitClusterUp();
}
protected static void shutdownMiniDFSCluster() {
if (CLUSTER != null) {
CLUSTER.shutdown(true);
CLUSTER = null;
}
}
}

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -63,16 +62,14 @@ import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@Category({ MiscTests.class, MediumTests.class })
public class TestFanOutOneBlockAsyncDFSOutput {
public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class);
HBaseClassTestRule.forClass(TestFanOutOneBlockAsyncDFSOutput.class);
private static final Logger LOG = LoggerFactory.getLogger(TestFanOutOneBlockAsyncDFSOutput.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static DistributedFileSystem FS;
private static EventLoopGroup EVENT_LOOP_GROUP;
@ -86,9 +83,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
TEST_UTIL.startMiniDFSCluster(3);
FS = TEST_UTIL.getDFSCluster().getFileSystem();
UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
startMiniDFSCluster(3);
FS = CLUSTER.getFileSystem();
EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
}
@ -98,11 +95,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
}
TEST_UTIL.shutdownMiniDFSCluster();
shutdownMiniDFSCluster();
}
static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
throws IOException, InterruptedException, ExecutionException {
throws IOException, InterruptedException, ExecutionException {
List<CompletableFuture<Long>> futures = new ArrayList<>();
byte[] b = new byte[10];
Random rand = new Random(12345);
@ -151,7 +148,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
out.write(b, 0, b.length);
out.flush(false).get();
// restart one datanode which causes one connection broken
TEST_UTIL.getDFSCluster().restartDataNode(0);
CLUSTER.restartDataNode(0);
out.write(b, 0, b.length);
try {
out.flush(false).get();
@ -199,8 +196,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
@Test
public void testConnectToDatanodeFailed()
throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
InvocationTargetException, InterruptedException, NoSuchFieldException {
throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
InvocationTargetException, InterruptedException, NoSuchFieldException {
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
xceiverServerDaemonField.setAccessible(true);
Class<?> xceiverServerClass =
@ -208,7 +205,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
numPeersMethod.setAccessible(true);
// make one datanode broken
DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0);
DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
@ -216,7 +213,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
} finally {
TEST_UTIL.getDFSCluster().restartDataNode(dnProp);
CLUSTER.restartDataNode(dnProp);
}
}

View File

@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -47,7 +47,7 @@ public class TestLocalAsyncOutput {
private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
@AfterClass
public static void tearDownAfterClass() throws IOException {

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.io.asyncfs;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@ -47,14 +46,12 @@ import org.junit.rules.TestName;
* Used to confirm that it is OK to overwrite a file which is being written currently.
*/
@Category({ MiscTests.class, MediumTests.class })
public class TestOverwriteFileUnderConstruction {
public class TestOverwriteFileUnderConstruction extends AsyncFSTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestOverwriteFileUnderConstruction.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static FileSystem FS;
@Rule
@ -62,13 +59,13 @@ public class TestOverwriteFileUnderConstruction {
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniDFSCluster(3);
FS = UTIL.getDFSCluster().getFileSystem();
startMiniDFSCluster(3);
FS = CLUSTER.getFileSystem();
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
shutdownMiniDFSCluster();
}
@Test

View File

@ -25,11 +25,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
@ -37,8 +40,8 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -57,6 +60,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@ -66,13 +71,14 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@RunWith(Parameterized.class)
@Category({ MiscTests.class, LargeTests.class })
public class TestSaslFanOutOneBlockAsyncDFSOutput {
public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class);
private static DistributedFileSystem FS;
@ -82,8 +88,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static int READ_TIMEOUT_MS = 200000;
private static final File KEYTAB_FILE =
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
@ -124,7 +129,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static void setUpKeyProvider(Configuration conf) throws Exception {
URI keyProviderUri =
new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString());
conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
@ -132,21 +137,56 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
keyProvider.close();
}
/**
* Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
* keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
*/
private static MiniKdc setupMiniKdc(File keytabFile) throws Exception {
Properties conf = MiniKdc.createConf();
conf.put(MiniKdc.DEBUG, true);
MiniKdc kdc = null;
File dir = null;
// There is time lag between selecting a port and trying to bind with it. It's possible that
// another service captures the port in between which'll result in BindException.
boolean bindException;
int numTries = 0;
do {
try {
bindException = false;
dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
kdc = new MiniKdc(conf, dir);
kdc.start();
} catch (BindException e) {
FileUtils.deleteDirectory(dir); // clean directory
numTries++;
if (numTries == 3) {
LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
throw e;
}
LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
bindException = true;
}
} while (bindException);
System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
keytabFile.getAbsolutePath());
return kdc;
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
KDC = setupMiniKdc(KEYTAB_FILE);
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
PRINCIPAL = USERNAME + "/" + HOST;
HTTP_PRINCIPAL = "HTTP/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
setUpKeyProvider(TEST_UTIL.getConfiguration());
HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration(),
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
HBaseKerberosUtils.setSSLConfiguration(TEST_UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
setUpKeyProvider(UTIL.getConfiguration());
HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
}
@AfterClass
@ -157,6 +197,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
if (KDC != null) {
KDC.stop();
}
shutdownMiniDFSCluster();
}
private Path testDirOnTestFs;
@ -171,25 +212,25 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
@Before
public void setUp() throws Exception {
TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) {
TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
} else {
TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
}
if (StringUtils.isBlank(encryptionAlgorithm)) {
TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
} else {
TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
}
if (StringUtils.isBlank(cipherSuite)) {
TEST_UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
} else {
TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
}
TEST_UTIL.startMiniDFSCluster(3);
FS = TEST_UTIL.getDFSCluster().getFileSystem();
startMiniDFSCluster(3);
FS = CLUSTER.getFileSystem();
testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
FS.mkdirs(testDirOnTestFs);
entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
@ -199,7 +240,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
@After
public void tearDown() throws IOException {
TEST_UTIL.shutdownMiniDFSCluster();
shutdownMiniDFSCluster();
}
private Path getTestFile() {

View File

@ -20,24 +20,24 @@ package org.apache.hadoop.hbase.security;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.security.UserGroupInformation;
@InterfaceAudience.Private
public class HBaseKerberosUtils {
public final class HBaseKerberosUtils {
private static final Logger LOG = LoggerFactory.getLogger(HBaseKerberosUtils.class);
public static final String KRB_PRINCIPAL = SecurityConstants.REGIONSERVER_KRB_PRINCIPAL;
@ -46,6 +46,9 @@ public class HBaseKerberosUtils {
public static final String CLIENT_PRINCIPAL = AuthUtil.HBASE_CLIENT_KERBEROS_PRINCIPAL;
public static final String CLIENT_KEYTAB = AuthUtil.HBASE_CLIENT_KEYTAB_FILE;
private HBaseKerberosUtils() {
}
public static boolean isKerberosPropertySetted() {
String krbPrincipal = System.getProperty(KRB_PRINCIPAL);
String krbKeytab = System.getProperty(KRB_KEYTAB_FILE);
@ -111,8 +114,8 @@ public class HBaseKerberosUtils {
* @param servicePrincipal service principal used by NN, HM and RS.
* @param spnegoPrincipal SPNEGO principal used by NN web UI.
*/
public static void setSecuredConfiguration(Configuration conf,
String servicePrincipal, String spnegoPrincipal) {
public static void setSecuredConfiguration(Configuration conf, String servicePrincipal,
String spnegoPrincipal) {
setPrincipalForTesting(servicePrincipal);
setSecuredConfiguration(conf);
setSecuredHadoopConfiguration(conf, spnegoPrincipal);
@ -128,17 +131,13 @@ public class HBaseKerberosUtils {
}
private static void setSecuredHadoopConfiguration(Configuration conf,
String spnegoServerPrincipal) {
// if we drop support for hadoop-2.4.0 and hadoop-2.4.1,
// the following key should be changed.
// 1) DFS_NAMENODE_USER_NAME_KEY -> DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY
// 2) DFS_DATANODE_USER_NAME_KEY -> DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
String spnegoServerPrincipal) {
String serverPrincipal = System.getProperty(KRB_PRINCIPAL);
String keytabFilePath = System.getProperty(KRB_KEYTAB_FILE);
// HDFS
conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, serverPrincipal);
conf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, serverPrincipal);
conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, keytabFilePath);
conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, serverPrincipal);
conf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, serverPrincipal);
conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, keytabFilePath);
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
// YARN
@ -146,8 +145,7 @@ public class HBaseKerberosUtils {
conf.set(YarnConfiguration.NM_PRINCIPAL, KRB_PRINCIPAL);
if (spnegoServerPrincipal != null) {
conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
spnegoServerPrincipal);
conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoServerPrincipal);
}
conf.setBoolean("ignore.secure.ports.for.testing", true);
@ -161,8 +159,8 @@ public class HBaseKerberosUtils {
* @param clazz the caller test class.
* @throws Exception if unable to set up SSL configuration
*/
public static void setSSLConfiguration(HBaseTestingUtility utility, Class clazz)
throws Exception {
public static void setSSLConfiguration(HBaseCommonTestingUtility utility, Class<?> clazz)
throws Exception {
Configuration conf = utility.getConfiguration();
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
@ -175,19 +173,19 @@ public class HBaseKerberosUtils {
}
public static UserGroupInformation loginAndReturnUGI(Configuration conf, String username)
throws IOException {
throws IOException {
String hostname = InetAddress.getLocalHost().getHostName();
String keyTabFileConfKey = "hbase." + username + ".keytab.file";
String keyTabFileLocation = conf.get(keyTabFileConfKey);
String principalConfKey = "hbase." + username + ".kerberos.principal";
String principal = org.apache.hadoop.security.SecurityUtil
.getServerPrincipal(conf.get(principalConfKey), hostname);
.getServerPrincipal(conf.get(principalConfKey), hostname);
if (keyTabFileLocation == null || principal == null) {
LOG.warn("Principal or key tab file null for : " + principalConfKey + ", "
+ keyTabFileConfKey);
LOG.warn(
"Principal or key tab file null for : " + principalConfKey + ", " + keyTabFileConfKey);
}
UserGroupInformation ugi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
return ugi;
}
}

View File

@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -33,21 +32,18 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test our recoverLease loop against mocked up filesystem.
*/
@Category({ MiscTests.class, MediumTests.class })
public class TestFSHDFSUtils {
public class TestRecoverLeaseFSUtils {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHDFSUtils.class);
HBaseClassTestRule.forClass(TestRecoverLeaseFSUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(TestFSHDFSUtils.class);
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final HBaseCommonTestingUtility HTU = new HBaseCommonTestingUtility();
static {
Configuration conf = HTU.getConfiguration();
conf.setInt("hbase.lease.recovery.first.pause", 10);
@ -67,14 +63,14 @@ public class TestFSHDFSUtils {
Mockito.when(reporter.progress()).thenReturn(true);
DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class);
// Fail four times and pass on the fifth.
Mockito.when(dfs.recoverLease(FILE)).
thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(true);
FSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter);
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false)
.thenReturn(false).thenReturn(true);
RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter);
Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE);
// Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the first two
// invocations will happen pretty fast... the we fall into the longer wait loop).
assertTrue((EnvironmentEdgeManager.currentTime() - startTime) >
(3 * HTU.getConfiguration().getInt("hbase.lease.recovery.dfs.timeout", 61000)));
assertTrue((EnvironmentEdgeManager.currentTime() - startTime) > (3 *
HTU.getConfiguration().getInt("hbase.lease.recovery.dfs.timeout", 61000)));
}
/**
@ -90,66 +86,13 @@ public class TestFSHDFSUtils {
// Now make it so we fail the first two times -- the two fast invocations, then we fall into
// the long loop during which we will call isFileClosed.... the next invocation should
// therefore return true if we are to break the loop.
Mockito.when(dfs.recoverLease(FILE)).
thenReturn(false).thenReturn(false).thenReturn(true);
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true);
Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true);
FSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter);
RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter);
Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE);
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
}
private void testIsSameHdfs(int nnport) throws IOException {
Configuration conf = HBaseConfiguration.create();
Path srcPath = new Path("hdfs://localhost:" + nnport + "/");
Path desPath = new Path("hdfs://127.0.0.1/");
FileSystem srcFs = srcPath.getFileSystem(conf);
FileSystem desFs = desPath.getFileSystem(conf);
assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
desPath = new Path("hdfs://127.0.0.1:8070/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
desPath = new Path("hdfs://127.0.1.1:" + nnport + "/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
conf.set("dfs.nameservices", "haosong-hadoop");
conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:"+ nnport);
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
desPath = new Path("/");
desFs = desPath.getFileSystem(conf);
assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:"+nnport);
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
desPath = new Path("/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
}
@Test
public void testIsSameHdfs() throws IOException {
String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion();
LOG.info("hadoop version is: " + hadoopVersion);
boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0");
if (isHadoop3_0_0) {
// Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
// See HDFS-9427
testIsSameHdfs(9820);
} else {
// pre hadoop 3.0.0 defaults to port 8020
// Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
testIsSameHdfs(8020);
}
}
/**
* Version of DFS that has HDFS-4525 in it.
*/

View File

@ -0,0 +1,179 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.regionserver.msginterval</name>
<value>100</value>
<description>Interval between messages from the RegionServer to HMaster
in milliseconds. Default is 15. Set this value low if you want unit
tests to be responsive.
</description>
</property>
<property>
<name>hbase.server.thread.wakefrequency</name>
<value>1000</value>
<value>100</value>
<description>Time to sleep in between searches for work (in milliseconds).
Used as sleep interval by service threads such as hbase:meta scanner and log roller.
</description>
</property>
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
</property>
<property>
<name>hbase.procedure.store.wal.use.hsync</name>
<value>false</value>
</property>
<property>
<name>hbase.procedure.check.owner.set</name>
<value>false</value>
<description>Whether ProcedureExecutor should enforce that each
procedure to have an owner
</description>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
<description>
Controls whether HBase will check for stream capabilities (hflush/hsync).
Disable this if you intend to run on LocalFileSystem.
WARNING: Doing so may expose you to additional risk of data loss!
</description>
</property>
<property>
<name>hbase.regionserver.handler.count</name>
<value>3</value>
<description>Default is 30</description>
</property>
<property>
<name>hbase.regionserver.metahandler.count</name>
<value>3</value>
<description>Default is 20</description>
</property>
<property>
<name>hbase.netty.worker.count</name>
<value>3</value>
<description>Default is 0</description>
</property>
<property>
<name>hbase.hconnection.threads.max</name>
<value>6</value>
<description>Default is 256</description>
</property>
<property>
<name>hbase.htable.threads.max</name>
<value>3</value>
<description>Default is MAX_INTEGER</description>
</property>
<property>
<name>hbase.region.replica.replication.threads.max</name>
<value>7</value>
<description>Default is 256</description>
</property>
<property>
<name>hbase.rest.threads.max</name>
<value>5</value>
<description>Default is 100</description>
</property>
<property>
<name>hbase.replication.bulkload.copy.maxthreads</name>
<value>3</value>
<description>Default is 10</description>
</property>
<property>
<name>hbase.loadincremental.threads.max</name>
<value>1</value>
<description>Default is # of CPUs</description>
</property>
<property>
<name>hbase.hstore.flusher.count</name>
<value>1</value>
<description>Default is 2</description>
</property>
<property>
<name>hbase.oldwals.cleaner.thread.size</name>
<value>1</value>
<description>Default is 2</description>
</property>
<property>
<name>hbase.master.procedure.threads</name>
<value>5</value>
<description>Default is at least 16</description>
</property>
<property>
<name>hbase.procedure.remote.dispatcher.threadpool.size</name>
<value>3</value>
<description>Default is 128</description>
</property>
<property>
<name>hbase.regionserver.executor.closeregion.threads</name>
<value>1</value>
<description>Default is 3</description>
</property>
<property>
<name>hbase.regionserver.executor.openregion.threads</name>
<value>1</value>
<description>Default is 3</description>
</property>
<property>
<name>hbase.regionserver.executor.openpriorityregion.threads</name>
<value>1</value>
<description>Default is 3</description>
</property>
<property>
<name>hbase.storescanner.parallel.seek.threads</name>
<value>3</value>
<description>Default is 10</description>
</property>
<property>
<name>hbase.hfile.compaction.discharger.thread.count</name>
<value>1</value>
<description>Default is 10</description>
</property>
<property>
<name>hbase.regionserver.executor.refresh.peer.threads</name>
<value>1</value>
<description>Default is 2</description>
</property>
<property>
<name>hbase.hregion.open.and.init.threads.max</name>
<value>3</value>
<description>Default is 16 or # of Regions</description>
</property>
<property>
<name>hbase.master.handler.count</name>
<value>7</value>
<description>Default is 25</description>
</property>
<property>
<name>hbase.replication.source.maxthreads</name>
<value></value>
<description>Default is 10</description>
</property>
<property>
<name>hbase.hconnection.meta.lookup.threads.max</name>
<value>5</value>
<description>Default is 128</description>
</property>
</configuration>

View File

@ -0,0 +1,56 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<!-- hadoop-2.0.5+'s HDFS-4305 by default enforces a min blocks size
of 1024*1024. Many unit tests that use the hlog use smaller
blocks. Setting this config to 0 to have tests pass -->
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>0</value>
</property>
<property>
<name>dfs.datanode.handler.count</name>
<value>5</value>
<description>Default is 10</description>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>5</value>
<description>Default is 10</description>
</property>
<property>
<name>dfs.namenode.service.handler.count</name>
<value>5</value>
<description>Default is 10</description>
</property>
<!--
Constraining this config makes tests fail.
<property>
<name>dfs.datanode.max.transfer.threads</name>
<value>16</value>
<description>Default is 4096. If constrain this
too much, tests do not complete.</description>
</property>
-->
</configuration>

View File

@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Define some default values that can be overridden by system properties
hbase.root.logger=INFO,console
hbase.log.dir=.
hbase.log.file=hbase.log
# Define the root logger to the system property "hbase.root.logger".
log4j.rootLogger=${hbase.root.logger}
# Logging Threshold
log4j.threshold=ALL
#
# Daily Rolling File Appender
#
log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}
# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
# Debugging Pattern format
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
#
# console
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
# Custom Logging levels
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.hadoop.hbase=DEBUG
#These settings are workarounds against spurious logs from the minicluster.
#See HBASE-4709
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN
log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN
# Enable this to get detailed connection error/retry logging.
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE

View File

@ -129,6 +129,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-http</artifactId>

View File

@ -145,6 +145,16 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>

View File

@ -117,6 +117,16 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-metrics-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>

View File

@ -164,6 +164,12 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>

View File

@ -336,6 +336,16 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>

View File

@ -326,16 +326,14 @@ public interface MasterServices extends Server {
/**
* Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint.
*
* <p>
* Only a single instance may be registered for a given {@link Service} subclass (the
* instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
* After the first registration, subsequent calls with the same service name will fail with
* a return value of {@code false}.
* </p>
* <p/>
* Only a single instance may be registered for a given {@link Service} subclass (the instances
* are keyed on
* {@link org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
* After the first registration, subsequent calls with the same service name will fail with a
* return value of {@code false}.
* @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
* @return {@code true} if the registration was successful, {@code false}
* otherwise
* @return {@code true} if the registration was successful, {@code false} otherwise
*/
boolean registerService(Service instance);

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@ -57,7 +57,7 @@ public class MasterProcedureEnv implements ConfigurationObserver {
@Override
public void recoverFileLease(final FileSystem fs, final Path path) throws IOException {
final Configuration conf = master.getConfiguration();
FSUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() {
RecoverLeaseFSUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("Recover Procedure Store log lease: " + path);

View File

@ -8391,19 +8391,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
* Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
* be available for handling Region#execService(com.google.protobuf.RpcController,
* org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall) calls.
*
* <p>
* Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to be
* available for handling {@link #execService(RpcController, CoprocessorServiceCall)} calls.
* <p/>
* Only a single instance may be registered per region for a given {@link Service} subclass (the
* instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
* After the first registration, subsequent calls with the same service name will fail with
* a return value of {@code false}.
* </p>
* instances are keyed on {@link ServiceDescriptor#getFullName()}.. After the first registration,
* subsequent calls with the same service name will fail with a return value of {@code false}.
* @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
* @return {@code true} if the registration was successful, {@code false}
* otherwise
* @return {@code true} if the registration was successful, {@code false} otherwise
*/
public boolean registerService(Service instance) {
// No stacking of instances is allowed for a single service name

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -33,9 +32,9 @@ import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -140,7 +139,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
Path path = new Path(rs.getWALRootDir(), wal);
long length = rs.getWALFileSystem().getFileStatus(path).getLen();
try {
FSUtils.recoverFileLease(fs, path, conf);
RecoverLeaseFSUtils.recoverFileLease(fs, path, conf);
return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration());
} catch (EOFException e) {
if (length <= 0) {

View File

@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
@ -385,7 +385,7 @@ class WALEntryStream implements Closeable {
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
FSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("recover WAL lease: " + path);

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
@ -80,7 +79,6 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Progressable;
@ -1892,181 +1890,4 @@ public final class FSUtils {
return false;
}
public static void recoverFileLease(FileSystem fs, Path p, Configuration conf)
throws IOException {
recoverFileLease(fs, p, conf, null);
}
/**
* Recover the lease from HDFS, retrying multiple times.
*/
public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
CancelableProgressable reporter) throws IOException {
if (fs instanceof FilterFileSystem) {
fs = ((FilterFileSystem) fs).getRawFileSystem();
}
// lease recovery not needed for local file system case.
if (!(fs instanceof DistributedFileSystem)) {
return;
}
recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
}
/*
* Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the
* lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has
* succeeded or the file is closed. But, we have to be careful. Each time we call recoverLease, it
* starts the recover lease process over from the beginning. We could put ourselves in a situation
* where we are doing nothing but starting a recovery, interrupting it to start again, and so on.
* The findings over in HBASE-8354 have it that the namenode will try to recover the lease on the
* file's primary node. If all is well, it should return near immediately. But, as is common, it
* is the very primary node that has crashed and so the namenode will be stuck waiting on a socket
* timeout before it will ask another datanode to start the recovery. It does not help if we call
* recoverLease in the meantime and in particular, subsequent to the socket timeout, a
* recoverLease invocation will cause us to start over from square one (possibly waiting on socket
* timeout against primary node). So, in the below, we do the following: 1. Call recoverLease. 2.
* If it returns true, break. 3. If it returns false, wait a few seconds and then call it again.
* 4. If it returns true, break. 5. If it returns false, wait for what we think the datanode
* socket timeout is (configurable) and then try again. 6. If it returns true, break. 7. If it
* returns false, repeat starting at step 5. above. If HDFS-4525 is available, call it every
* second and we might be able to exit early.
*/
private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
final Configuration conf, final CancelableProgressable reporter) throws IOException {
LOG.info("Recover lease on dfs file " + p);
long startWaiting = EnvironmentEdgeManager.currentTime();
// Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
// usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
// beyond that limit 'to be safe'.
long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
// This setting should be a little bit above what the cluster dfs heartbeat is set to.
long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000);
// This should be set to how long it'll take for us to timeout against primary datanode if it
// is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the
// default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this
// timeout, then further recovery will take liner backoff with this base, to avoid endless
// preemptions when this value is not properly configured.
long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000);
Method isFileClosedMeth = null;
// whether we need to look for isFileClosed method
boolean findIsFileClosedMeth = true;
boolean recovered = false;
// We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
for (int nbAttempt = 0; !recovered; nbAttempt++) {
recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
if (recovered) {
break;
}
checkIfCancelled(reporter);
if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
break;
}
try {
// On the first time through wait the short 'firstPause'.
if (nbAttempt == 0) {
Thread.sleep(firstPause);
} else {
// Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check
// isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though.
long localStartWaiting = EnvironmentEdgeManager.currentTime();
while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) < subsequentPauseBase *
nbAttempt) {
Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
if (findIsFileClosedMeth) {
try {
isFileClosedMeth =
dfs.getClass().getMethod("isFileClosed", new Class[] { Path.class });
} catch (NoSuchMethodException nsme) {
LOG.debug("isFileClosed not available");
} finally {
findIsFileClosedMeth = false;
}
}
if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
recovered = true;
break;
}
checkIfCancelled(reporter);
}
}
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
return recovered;
}
private static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
final int nbAttempt, final Path p, final long startWaiting) {
if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.warn("Cannot recoverLease after trying for " +
conf.getInt("hbase.lease.recovery.timeout", 900000) +
"ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
getLogMessageDetail(nbAttempt, p, startWaiting));
return true;
}
return false;
}
/**
* Try to recover the lease.
* @return True if dfs#recoverLease came by true.
*/
private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt,
final Path p, final long startWaiting) throws FileNotFoundException {
boolean recovered = false;
try {
recovered = dfs.recoverLease(p);
LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ") +
getLogMessageDetail(nbAttempt, p, startWaiting));
} catch (IOException e) {
if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
// This exception comes out instead of FNFE, fix it
throw new FileNotFoundException("The given WAL wasn't found at " + p);
} else if (e instanceof FileNotFoundException) {
throw (FileNotFoundException) e;
}
LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
}
return recovered;
}
/**
* @return Detail to append to any log message around lease recovering.
*/
private static String getLogMessageDetail(final int nbAttempt, final Path p,
final long startWaiting) {
return "attempt=" + nbAttempt + " on file=" + p + " after " +
(EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
}
/**
* Call HDFS-4525 isFileClosed if it is available.
* @return True if file is closed.
*/
private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m,
final Path p) {
try {
return (Boolean) m.invoke(dfs, p);
} catch (SecurityException e) {
LOG.warn("No access", e);
} catch (Exception e) {
LOG.warn("Failed invocation for " + p.toString(), e);
}
return false;
}
private static void checkIfCancelled(final CancelableProgressable reporter)
throws InterruptedIOException {
if (reporter == null) {
return;
}
if (!reporter.progress()) {
throw new InterruptedIOException("Operation cancelled");
}
}
}

View File

@ -843,7 +843,7 @@ public class RegionSplitter {
fs.rename(tmpFile, splitFile);
} else {
LOG.debug("_balancedSplit file found. Replay log to restore state...");
FSUtils.recoverFileLease(fs, splitFile, connection.getConfiguration(), null);
RecoverLeaseFSUtils.recoverFileLease(fs, splitFile, connection.getConfiguration(), null);
// parse split file and process remaining splits
FSDataInputStream tmpIn = fs.open(splitFile);

View File

@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@ -514,7 +514,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
private static void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
FSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("Still trying to recover WAL lease: " + path);

View File

@ -54,7 +54,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
@ -394,7 +394,7 @@ public class WALSplitter {
}
try {
FSUtils.recoverFileLease(walFS, path, conf, reporter);
RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, reporter);
try {
in = getReader(path, reporter);
} catch (EOFException e) {

View File

@ -678,7 +678,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
createDirAndSetProperty("nfs.dump.dir");
createDirAndSetProperty("java.io.tmpdir");
createDirAndSetProperty("java.io.tmpdir");
createDirAndSetProperty("dfs.journalnode.edits.dir");
createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");

View File

@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -497,8 +497,8 @@ public class TestLogRolling extends AbstractTestLogRolling {
Set<String> loggedRows = new HashSet<>();
for (Path p : paths) {
LOG.debug("recovering lease for " + p);
FSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(),
null);
RecoverLeaseFSUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p,
TEST_UTIL.getConfiguration(), null);
LOG.debug("Reading WAL " + CommonFSUtils.getPath(p));
WAL.Reader reader = null;

View File

@ -28,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -37,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
@ -645,17 +647,12 @@ public class TestFSUtils {
}
private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
static {
boolean tmp = false;
try {
Class.forName("org.apache.hadoop.fs.StreamCapabilities");
tmp = true;
LOG.debug("Test thought StreamCapabilities class was present.");
} catch (ClassNotFoundException exception) {
LOG.debug("Test didn't think StreamCapabilities class was present.");
} finally {
STREAM_CAPABILITIES_IS_PRESENT = tmp;
}
}
@ -672,4 +669,56 @@ public class TestFSUtils {
cluster.shutdown();
}
}
private void testIsSameHdfs(int nnport) throws IOException {
Configuration conf = HBaseConfiguration.create();
Path srcPath = new Path("hdfs://localhost:" + nnport + "/");
Path desPath = new Path("hdfs://127.0.0.1/");
FileSystem srcFs = srcPath.getFileSystem(conf);
FileSystem desFs = desPath.getFileSystem(conf);
assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
desPath = new Path("hdfs://127.0.0.1:8070/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
desPath = new Path("hdfs://127.0.1.1:" + nnport + "/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
conf.set("dfs.nameservices", "haosong-hadoop");
conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:" + nnport);
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
desPath = new Path("/");
desFs = desPath.getFileSystem(conf);
assertTrue(FSUtils.isSameHdfs(conf, srcFs, desFs));
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:" + nnport);
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
desPath = new Path("/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSUtils.isSameHdfs(conf, srcFs, desFs));
}
@Test
public void testIsSameHdfs() throws IOException {
String hadoopVersion = org.apache.hadoop.util.VersionInfo.getVersion();
LOG.info("hadoop version is: " + hadoopVersion);
boolean isHadoop3_0_0 = hadoopVersion.startsWith("3.0.0");
if (isHadoop3_0_0) {
// Hadoop 3.0.0 alpha1+ ~ 3.0.0 GA changed default nn port to 9820.
// See HDFS-9427
testIsSameHdfs(9820);
} else {
// pre hadoop 3.0.0 defaults to port 8020
// Hadoop 3.0.1 changed it back to port 8020. See HDFS-12990
testIsSameHdfs(8020);
}
}
}

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALFactory.Providers;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -455,7 +455,7 @@ public class TestWALFactory {
@Override
public void run() {
try {
FSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null);
RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null);
} catch (IOException e) {
exception = e;
}

View File

@ -135,6 +135,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<type>test-jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>

View File

@ -85,6 +85,12 @@
<type>test-jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<type>test-jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>

View File

@ -185,6 +185,12 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>

14
pom.xml
View File

@ -91,6 +91,7 @@
<module>hbase-backup</module>
<module>hbase-zookeeper</module>
<module>hbase-hbtop</module>
<module>hbase-asyncfs</module>
</modules>
<scm>
<connection>scm:git:git://gitbox.apache.org/repos/asf/hbase.git</connection>
@ -1577,6 +1578,7 @@
<annotations.test.jar>hbase-annotations-${project.version}-tests.jar</annotations.test.jar>
<mapreduce.test.jar>hbase-mapreduce-${project.version}-tests.jar</mapreduce.test.jar>
<zookeeper.test.jar>hbase-zookeeper-${project.version}-tests.jar</zookeeper.test.jar>
<asyncfs.test.jar>hbase-asyncfs-${project.version}-tests.jar</asyncfs.test.jar>
<shell-executable>bash</shell-executable>
<surefire.provider>surefire-junit47</surefire.provider>
<!-- default: run small & medium, medium with 2 threads -->
@ -1884,6 +1886,18 @@
<artifactId>hbase-shaded-mapreduce</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-asyncfs</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- General dependencies -->
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>