HBASE-16257 Move staging dir to be under hbase root dir
This commit is contained in:
parent
d2ed74cbc6
commit
50b051ade1
|
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRe
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||||
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
|
@ -125,8 +124,4 @@ public class SecureBulkLoadClient {
|
||||||
throw ProtobufUtil.handleRemoteException(se);
|
throw ProtobufUtil.handleRemoteException(se);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
|
|
||||||
return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -1,46 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.security;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class SecureBulkLoadUtil {
|
|
||||||
private final static String BULKLOAD_STAGING_DIR = "hbase.bulkload.staging.dir";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This returns the staging path for a given column family.
|
|
||||||
* This is needed for clean recovery and called reflectively in LoadIncrementalHFiles
|
|
||||||
*/
|
|
||||||
public static Path getStagingPath(Configuration conf, String bulkToken, byte[] family) {
|
|
||||||
Path stageP = new Path(getBaseStagingDir(conf), bulkToken);
|
|
||||||
return new Path(stageP, Bytes.toString(family));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Path getBaseStagingDir(Configuration conf) {
|
|
||||||
String hbaseTmpFsDir =
|
|
||||||
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
|
|
||||||
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
|
|
||||||
return new Path(conf.get(BULKLOAD_STAGING_DIR, hbaseTmpFsDir));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -302,6 +302,9 @@ public final class HConstants {
|
||||||
/** Like the previous, but for old logs that are about to be deleted */
|
/** Like the previous, but for old logs that are about to be deleted */
|
||||||
public static final String HREGION_OLDLOGDIR_NAME = "oldWALs";
|
public static final String HREGION_OLDLOGDIR_NAME = "oldWALs";
|
||||||
|
|
||||||
|
/** Staging dir used by bulk load */
|
||||||
|
public static final String BULKLOAD_STAGING_DIR_NAME = "staging";
|
||||||
|
|
||||||
public static final String CORRUPT_DIR_NAME = "corrupt";
|
public static final String CORRUPT_DIR_NAME = "corrupt";
|
||||||
|
|
||||||
/** Used by HBCK to sideline backup data */
|
/** Used by HBCK to sideline backup data */
|
||||||
|
|
|
@ -69,13 +69,6 @@ possible configurations would overwhelm and obscure the important.
|
||||||
for keeping temporary data.
|
for keeping temporary data.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
<property >
|
|
||||||
<name>hbase.bulkload.staging.dir</name>
|
|
||||||
<value>${hbase.fs.tmp.dir}</value>
|
|
||||||
<description>A staging directory in default file system (HDFS)
|
|
||||||
for bulk loading.
|
|
||||||
</description>
|
|
||||||
</property>
|
|
||||||
<property >
|
<property >
|
||||||
<name>hbase.cluster.distributed</name>
|
<name>hbase.cluster.distributed</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
|
@ -1190,7 +1183,7 @@ possible configurations would overwhelm and obscure the important.
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.rootdir.perms</name>
|
<name>hbase.rootdir.perms</name>
|
||||||
<value>700</value>
|
<value>700</value>
|
||||||
<description>FS Permissions for the root directory in a secure (kerberos) setup.
|
<description>FS Permissions for the root data subdirectory in a secure (kerberos) setup.
|
||||||
When master starts, it creates the rootdir with this permissions or sets the permissions
|
When master starts, it creates the rootdir with this permissions or sets the permissions
|
||||||
if it does not match.</description>
|
if it does not match.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
|
@ -118,7 +118,6 @@ public class HBaseCommonTestingUtility {
|
||||||
if (deleteOnExit()) this.dataTestDir.deleteOnExit();
|
if (deleteOnExit()) this.dataTestDir.deleteOnExit();
|
||||||
|
|
||||||
createSubDir("hbase.local.dir", testPath, "hbase-local-dir");
|
createSubDir("hbase.local.dir", testPath, "hbase-local-dir");
|
||||||
createSubDir("hbase.bulkload.staging.dir", testPath, "staging");
|
|
||||||
|
|
||||||
return testPath;
|
return testPath;
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,8 +133,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
private int nrThreads;
|
private int nrThreads;
|
||||||
private RpcControllerFactory rpcControllerFactory;
|
private RpcControllerFactory rpcControllerFactory;
|
||||||
|
|
||||||
private LoadIncrementalHFiles() {}
|
|
||||||
|
|
||||||
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.rpcControllerFactory = new RpcControllerFactory(conf);
|
this.rpcControllerFactory = new RpcControllerFactory(conf);
|
||||||
|
@ -1199,7 +1197,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);
|
int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args);
|
||||||
System.exit(ret);
|
System.exit(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hbase.ClusterId;
|
import org.apache.hadoop.hbase.ClusterId;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
@ -64,6 +66,25 @@ public class MasterFileSystem {
|
||||||
// hbase temp directory used for table construction and deletion
|
// hbase temp directory used for table construction and deletion
|
||||||
private final Path tempdir;
|
private final Path tempdir;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In a secure env, the protected sub-directories and files under the HBase rootDir
|
||||||
|
* would be restricted. The sub-directory will have '700' except the bulk load staging dir,
|
||||||
|
* which will have '711'. The default '700' can be overwritten by setting the property
|
||||||
|
* 'hbase.rootdir.perms'. The protected files (version file, clusterId file) will have '600'.
|
||||||
|
* The rootDir itself will be created with HDFS default permissions if it does not exist.
|
||||||
|
* We will check the rootDir permissions to make sure it has 'x' for all to ensure access
|
||||||
|
* to the staging dir. If it does not, we will add it.
|
||||||
|
*/
|
||||||
|
// Permissions for the directories under rootDir that need protection
|
||||||
|
private final FsPermission secureRootSubDirPerms;
|
||||||
|
// Permissions for the files under rootDir that need protection
|
||||||
|
private final FsPermission secureRootFilePerms = new FsPermission("600");
|
||||||
|
// Permissions for bulk load staging directory under rootDir
|
||||||
|
private final FsPermission HiddenDirPerms = FsPermission.valueOf("-rwx--x--x");
|
||||||
|
|
||||||
|
private boolean isSecurityEnabled;
|
||||||
|
|
||||||
private final MasterServices services;
|
private final MasterServices services;
|
||||||
|
|
||||||
public MasterFileSystem(MasterServices services) throws IOException {
|
public MasterFileSystem(MasterServices services) throws IOException {
|
||||||
|
@ -81,6 +102,8 @@ public class MasterFileSystem {
|
||||||
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
|
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
|
||||||
// make sure the fs has the same conf
|
// make sure the fs has the same conf
|
||||||
fs.setConf(conf);
|
fs.setConf(conf);
|
||||||
|
this.secureRootSubDirPerms = new FsPermission(conf.get("hbase.rootdir.perms", "700"));
|
||||||
|
this.isSecurityEnabled = "kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication"));
|
||||||
// setup the filesystem variable
|
// setup the filesystem variable
|
||||||
createInitialFileSystemLayout();
|
createInitialFileSystemLayout();
|
||||||
HFileSystem.addLocationsOrderInterceptor(conf);
|
HFileSystem.addLocationsOrderInterceptor(conf);
|
||||||
|
@ -96,11 +119,46 @@ public class MasterFileSystem {
|
||||||
* Idempotent.
|
* Idempotent.
|
||||||
*/
|
*/
|
||||||
private void createInitialFileSystemLayout() throws IOException {
|
private void createInitialFileSystemLayout() throws IOException {
|
||||||
|
|
||||||
|
final String[] protectedSubDirs = new String[] {
|
||||||
|
HConstants.BASE_NAMESPACE_DIR,
|
||||||
|
HConstants.HFILE_ARCHIVE_DIRECTORY,
|
||||||
|
HConstants.HREGION_LOGDIR_NAME,
|
||||||
|
HConstants.HREGION_OLDLOGDIR_NAME,
|
||||||
|
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR,
|
||||||
|
HConstants.CORRUPT_DIR_NAME,
|
||||||
|
HConstants.HBCK_SIDELINEDIR_NAME,
|
||||||
|
MobConstants.MOB_DIR_NAME
|
||||||
|
};
|
||||||
// check if the root directory exists
|
// check if the root directory exists
|
||||||
checkRootDir(this.rootdir, conf, this.fs);
|
checkRootDir(this.rootdir, conf, this.fs);
|
||||||
|
|
||||||
// check if temp directory exists and clean it
|
// Check the directories under rootdir.
|
||||||
checkTempDir(this.tempdir, conf, this.fs);
|
checkTempDir(this.tempdir, conf, this.fs);
|
||||||
|
for (String subDir : protectedSubDirs) {
|
||||||
|
checkSubDir(new Path(this.rootdir, subDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
checkStagingDir();
|
||||||
|
|
||||||
|
// Handle the last few special files and set the final rootDir permissions
|
||||||
|
// rootDir needs 'x' for all to support bulk load staging dir
|
||||||
|
if (isSecurityEnabled) {
|
||||||
|
fs.setPermission(new Path(rootdir, HConstants.VERSION_FILE_NAME), secureRootFilePerms);
|
||||||
|
fs.setPermission(new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME), secureRootFilePerms);
|
||||||
|
}
|
||||||
|
FsPermission currentRootPerms = fs.getFileStatus(this.rootdir).getPermission();
|
||||||
|
if (!currentRootPerms.getUserAction().implies(FsAction.EXECUTE)
|
||||||
|
|| !currentRootPerms.getGroupAction().implies(FsAction.EXECUTE)
|
||||||
|
|| !currentRootPerms.getOtherAction().implies(FsAction.EXECUTE)) {
|
||||||
|
LOG.warn("rootdir permissions do not contain 'excute' for user, group or other. "
|
||||||
|
+ "Automatically adding 'excute' permission for all");
|
||||||
|
fs.setPermission(
|
||||||
|
this.rootdir,
|
||||||
|
new FsPermission(currentRootPerms.getUserAction().or(FsAction.EXECUTE), currentRootPerms
|
||||||
|
.getGroupAction().or(FsAction.EXECUTE), currentRootPerms.getOtherAction().or(
|
||||||
|
FsAction.EXECUTE)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileSystem getFileSystem() {
|
public FileSystem getFileSystem() {
|
||||||
|
@ -146,17 +204,10 @@ public class MasterFileSystem {
|
||||||
// If FS is in safe mode wait till out of it.
|
// If FS is in safe mode wait till out of it.
|
||||||
FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
|
FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
|
||||||
|
|
||||||
boolean isSecurityEnabled = "kerberos".equalsIgnoreCase(c.get("hbase.security.authentication"));
|
|
||||||
FsPermission rootDirPerms = new FsPermission(c.get("hbase.rootdir.perms", "700"));
|
|
||||||
|
|
||||||
// Filesystem is good. Go ahead and check for hbase.rootdir.
|
// Filesystem is good. Go ahead and check for hbase.rootdir.
|
||||||
try {
|
try {
|
||||||
if (!fs.exists(rd)) {
|
if (!fs.exists(rd)) {
|
||||||
if (isSecurityEnabled) {
|
|
||||||
fs.mkdirs(rd, rootDirPerms);
|
|
||||||
} else {
|
|
||||||
fs.mkdirs(rd);
|
fs.mkdirs(rd);
|
||||||
}
|
|
||||||
// DFS leaves safe mode with 0 DNs when there are 0 blocks.
|
// DFS leaves safe mode with 0 DNs when there are 0 blocks.
|
||||||
// We used to handle this by checking the current DN count and waiting until
|
// We used to handle this by checking the current DN count and waiting until
|
||||||
// it is nonzero. With security, the check for datanode count doesn't work --
|
// it is nonzero. With security, the check for datanode count doesn't work --
|
||||||
|
@ -171,16 +222,6 @@ public class MasterFileSystem {
|
||||||
if (!fs.isDirectory(rd)) {
|
if (!fs.isDirectory(rd)) {
|
||||||
throw new IllegalArgumentException(rd.toString() + " is not a directory");
|
throw new IllegalArgumentException(rd.toString() + " is not a directory");
|
||||||
}
|
}
|
||||||
if (isSecurityEnabled && !rootDirPerms.equals(fs.getFileStatus(rd).getPermission())) {
|
|
||||||
// check whether the permission match
|
|
||||||
LOG.warn("Found rootdir permissions NOT matching expected \"hbase.rootdir.perms\" for "
|
|
||||||
+ "rootdir=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission()
|
|
||||||
+ " and \"hbase.rootdir.perms\" configured as "
|
|
||||||
+ c.get("hbase.rootdir.perms", "700") + ". Automatically setting the permissions. You"
|
|
||||||
+ " can change the permissions by setting \"hbase.rootdir.perms\" in hbase-site.xml "
|
|
||||||
+ "and restarting the master");
|
|
||||||
fs.setPermission(rd, rootDirPerms);
|
|
||||||
}
|
|
||||||
// as above
|
// as above
|
||||||
FSUtils.checkVersion(fs, rd, true, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
|
FSUtils.checkVersion(fs, rd, true, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
|
||||||
10 * 1000), c.getInt(HConstants.VERSION_FILE_WRITE_ATTEMPTS,
|
10 * 1000), c.getInt(HConstants.VERSION_FILE_WRITE_ATTEMPTS,
|
||||||
|
@ -239,10 +280,68 @@ public class MasterFileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the temp directory
|
// Create the temp directory
|
||||||
|
if (isSecurityEnabled) {
|
||||||
|
if (!fs.mkdirs(tmpdir, secureRootSubDirPerms)) {
|
||||||
|
throw new IOException("HBase temp directory '" + tmpdir + "' creation failure.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
if (!fs.mkdirs(tmpdir)) {
|
if (!fs.mkdirs(tmpdir)) {
|
||||||
throw new IOException("HBase temp directory '" + tmpdir + "' creation failure.");
|
throw new IOException("HBase temp directory '" + tmpdir + "' creation failure.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure the directories under rootDir have good permissions. Create if necessary.
|
||||||
|
* @param p
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void checkSubDir(final Path p) throws IOException {
|
||||||
|
if (!fs.exists(p)) {
|
||||||
|
if (isSecurityEnabled) {
|
||||||
|
if (!fs.mkdirs(p, secureRootSubDirPerms)) {
|
||||||
|
throw new IOException("HBase directory '" + p + "' creation failure.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!fs.mkdirs(p)) {
|
||||||
|
throw new IOException("HBase directory '" + p + "' creation failure.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (isSecurityEnabled && !secureRootSubDirPerms.equals(fs.getFileStatus(p).getPermission())) {
|
||||||
|
// check whether the permission match
|
||||||
|
LOG.warn("Found HBase directory permissions NOT matching expected permissions for "
|
||||||
|
+ p.toString() + " permissions=" + fs.getFileStatus(p).getPermission()
|
||||||
|
+ ", expecting " + secureRootSubDirPerms + ". Automatically setting the permissions. "
|
||||||
|
+ "You can change the permissions by setting \"hbase.rootdir.perms\" in hbase-site.xml "
|
||||||
|
+ "and restarting the master");
|
||||||
|
fs.setPermission(p, secureRootSubDirPerms);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check permissions for bulk load staging directory. This directory has special hidden
|
||||||
|
* permissions. Create it if necessary.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void checkStagingDir() throws IOException {
|
||||||
|
Path p = new Path(this.rootdir, HConstants.BULKLOAD_STAGING_DIR_NAME);
|
||||||
|
try {
|
||||||
|
if (!this.fs.exists(p)) {
|
||||||
|
if (!this.fs.mkdirs(p, HiddenDirPerms)) {
|
||||||
|
throw new IOException("Failed to create staging directory " + p.toString());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.fs.setPermission(p, HiddenDirPerms);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed to create or set permission on staging directory " + p.toString());
|
||||||
|
throw new IOException("Failed to create or set permission on staging directory "
|
||||||
|
+ p.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void bootstrap(final Path rd, final Configuration c)
|
private static void bootstrap(final Path rd, final Configuration c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -104,11 +104,6 @@ public class MasterWalManager {
|
||||||
this.distributedLogReplay = this.splitLogManager.isLogReplaying();
|
this.distributedLogReplay = this.splitLogManager.isLogReplaying();
|
||||||
|
|
||||||
this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
|
||||||
// Make sure the region servers can archive their old logs
|
|
||||||
if (!this.fs.exists(oldLogDir)) {
|
|
||||||
this.fs.mkdirs(oldLogDir);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
||||||
|
@ -38,12 +39,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
|
import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
|
||||||
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Methods;
|
import org.apache.hadoop.hbase.util.Methods;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -98,7 +99,6 @@ public class SecureBulkLoadManager {
|
||||||
|
|
||||||
private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
|
private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
|
||||||
private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
|
private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
|
||||||
|
|
||||||
private SecureRandom random;
|
private SecureRandom random;
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
@ -113,32 +113,18 @@ public class SecureBulkLoadManager {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() throws IOException {
|
||||||
random = new SecureRandom();
|
random = new SecureRandom();
|
||||||
baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
|
userProvider = UserProvider.instantiate(conf);
|
||||||
this.userProvider = UserProvider.instantiate(conf);
|
|
||||||
|
|
||||||
try {
|
|
||||||
fs = FileSystem.get(conf);
|
fs = FileSystem.get(conf);
|
||||||
|
baseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
|
||||||
|
|
||||||
|
if (conf.get("hbase.bulkload.staging.dir") != null) {
|
||||||
|
LOG.warn("hbase.bulkload.staging.dir " + " is deprecated. Bulkload staging directory is "
|
||||||
|
+ baseStagingDir);
|
||||||
|
}
|
||||||
|
if (!fs.exists(baseStagingDir)) {
|
||||||
fs.mkdirs(baseStagingDir, PERM_HIDDEN);
|
fs.mkdirs(baseStagingDir, PERM_HIDDEN);
|
||||||
fs.setPermission(baseStagingDir, PERM_HIDDEN);
|
|
||||||
FileStatus status = fs.getFileStatus(baseStagingDir);
|
|
||||||
//no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased
|
|
||||||
fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
|
|
||||||
if (status == null) {
|
|
||||||
throw new IllegalStateException("Failed to create staging directory "
|
|
||||||
+ baseStagingDir.toString());
|
|
||||||
}
|
|
||||||
if (!status.getPermission().equals(PERM_HIDDEN)) {
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"Staging directory already exists but permissions aren't set to '-rwx--x--x' "
|
|
||||||
+ baseStagingDir.toString());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Failed to create or set permission on staging directory "
|
|
||||||
+ baseStagingDir.toString(), e);
|
|
||||||
throw new IllegalStateException("Failed to create or set permission on staging directory "
|
|
||||||
+ baseStagingDir.toString(), e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -82,7 +83,7 @@ public class HFileReplicator {
|
||||||
private UserProvider userProvider;
|
private UserProvider userProvider;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
private String hbaseStagingDir;
|
private Path hbaseStagingDir;
|
||||||
private ThreadPoolExecutor exec;
|
private ThreadPoolExecutor exec;
|
||||||
private int maxCopyThreads;
|
private int maxCopyThreads;
|
||||||
private int copiesPerThread;
|
private int copiesPerThread;
|
||||||
|
@ -100,7 +101,7 @@ public class HFileReplicator {
|
||||||
|
|
||||||
userProvider = UserProvider.instantiate(conf);
|
userProvider = UserProvider.instantiate(conf);
|
||||||
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
||||||
this.hbaseStagingDir = conf.get("hbase.bulkload.staging.dir");
|
this.hbaseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
|
||||||
this.maxCopyThreads =
|
this.maxCopyThreads =
|
||||||
this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
|
this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
|
||||||
REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
|
REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
|
||||||
|
@ -253,7 +254,7 @@ public class HFileReplicator {
|
||||||
|
|
||||||
// Create staging directory for each table
|
// Create staging directory for each table
|
||||||
Path stagingDir =
|
Path stagingDir =
|
||||||
createStagingDir(new Path(hbaseStagingDir), user, TableName.valueOf(tableName));
|
createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName));
|
||||||
|
|
||||||
familyHFilePathsPairsList = tableEntry.getValue();
|
familyHFilePathsPairsList = tableEntry.getValue();
|
||||||
familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
|
familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
|
||||||
|
|
|
@ -141,9 +141,11 @@ public class HFileCorruptionChecker {
|
||||||
Path regionDir = cfDir.getParent();
|
Path regionDir = cfDir.getParent();
|
||||||
Path tableDir = regionDir.getParent();
|
Path tableDir = regionDir.getParent();
|
||||||
|
|
||||||
// build up the corrupted dirs strcture
|
// build up the corrupted dirs structure
|
||||||
Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), conf.get(
|
Path corruptBaseDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
|
||||||
"hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
|
if (conf.get("hbase.hfile.quarantine.dir") != null) {
|
||||||
|
LOG.warn("hbase.hfile.quarantine.dir is deprecated. Default to " + corruptBaseDir);
|
||||||
|
}
|
||||||
Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
|
Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
|
||||||
Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
|
Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
|
||||||
Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
|
Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
|
||||||
|
|
|
@ -477,9 +477,11 @@ public class WALSplitter {
|
||||||
final List<Path> corruptedLogs,
|
final List<Path> corruptedLogs,
|
||||||
final List<Path> processedLogs, final Path oldLogDir,
|
final List<Path> processedLogs, final Path oldLogDir,
|
||||||
final FileSystem fs, final Configuration conf) throws IOException {
|
final FileSystem fs, final Configuration conf) throws IOException {
|
||||||
final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
|
final Path corruptDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
|
||||||
"hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
|
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
|
||||||
|
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to "
|
||||||
|
+ corruptDir);
|
||||||
|
}
|
||||||
if (!fs.mkdirs(corruptDir)) {
|
if (!fs.mkdirs(corruptDir)) {
|
||||||
LOG.info("Unable to mkdir " + corruptDir);
|
LOG.info("Unable to mkdir " + corruptDir);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1282,7 +1282,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
||||||
} else {
|
} else {
|
||||||
LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
|
LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
|
||||||
}
|
}
|
||||||
this.conf.set("hbase.bulkload.staging.dir", this.conf.get("hbase.fs.tmp.dir"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -50,10 +50,10 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
|
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -342,7 +342,7 @@ public class TestLoadIncrementalHFiles {
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify staging folder has been cleaned up
|
// verify staging folder has been cleaned up
|
||||||
Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
|
Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
|
||||||
if(fs.exists(stagingBasePath)) {
|
if(fs.exists(stagingBasePath)) {
|
||||||
FileStatus[] files = fs.listStatus(stagingBasePath);
|
FileStatus[] files = fs.listStatus(stagingBasePath);
|
||||||
for(FileStatus file : files) {
|
for(FileStatus file : files) {
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
|
||||||
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
|
|
||||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -166,7 +165,4 @@ public class SecureBulkLoadEndpointClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
|
|
||||||
return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1076,8 +1076,7 @@ public class TestWALSplit {
|
||||||
useDifferentDFSClient();
|
useDifferentDFSClient();
|
||||||
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
|
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
|
||||||
|
|
||||||
final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
|
final Path corruptDir = new Path(FSUtils.getRootDir(conf), HConstants.CORRUPT_DIR_NAME);
|
||||||
"hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
|
|
||||||
assertEquals(1, fs.listStatus(corruptDir).length);
|
assertEquals(1, fs.listStatus(corruptDir).length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue