HBASE-10902 Make Secure Bulk Load work across remote secure clusters (Jerry He)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1586709 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a0a7cb53ca
commit
49a3f5e4d1
|
@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
|||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -113,6 +114,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
private int maxFilesPerRegionPerFamily;
|
||||
private boolean assignSeqIds;
|
||||
|
||||
// Source filesystem
|
||||
private FileSystem fs;
|
||||
// Source delegation token
|
||||
private FsDelegationToken fsDelegationToken;
|
||||
private String bulkToken;
|
||||
private UserProvider userProvider;
|
||||
|
@ -164,7 +168,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
*/
|
||||
private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
|
||||
throws IOException {
|
||||
FileSystem fs = hfofDir.getFileSystem(getConf());
|
||||
fs = hfofDir.getFileSystem(getConf());
|
||||
|
||||
if (!fs.exists(hfofDir)) {
|
||||
throw new FileNotFoundException("HFileOutputFormat dir " +
|
||||
|
@ -270,10 +274,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
return;
|
||||
}
|
||||
|
||||
//If using secure bulk load
|
||||
//If using secure bulk load, get source delegation token, and
|
||||
//prepare staging directory and token
|
||||
if (userProvider.isHBaseSecurityEnabled()) {
|
||||
FileSystem fs = FileSystem.get(getConf());
|
||||
// fs is the source filesystem
|
||||
fsDelegationToken.acquireDelegationToken(fs);
|
||||
|
||||
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
|
||||
|
@ -332,7 +336,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
LOG.error(err);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (queue != null && !queue.isEmpty()) {
|
||||
throw new RuntimeException("Bulk load aborted with some files not yet loaded."
|
||||
+ "Please check log for more details.");
|
||||
|
@ -510,7 +514,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
final Pair<byte[][], byte[][]> startEndKeys)
|
||||
throws IOException {
|
||||
final Path hfilePath = item.hfilePath;
|
||||
final FileSystem fs = hfilePath.getFileSystem(getConf());
|
||||
HFile.Reader hfr = HFile.createReader(fs, hfilePath,
|
||||
new CacheConfig(getConf()), getConf());
|
||||
final byte[] first, last;
|
||||
|
@ -640,23 +643,28 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
//from the staging directory back to original location
|
||||
//in user directory
|
||||
if(secureClient != null && !success) {
|
||||
FileSystem fs = FileSystem.get(getConf());
|
||||
for(Pair<byte[], String> el : famPaths) {
|
||||
Path hfileStagingPath = null;
|
||||
Path hfileOrigPath = new Path(el.getSecond());
|
||||
try {
|
||||
hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
|
||||
FileSystem targetFs = FileSystem.get(getConf());
|
||||
// Check to see if the source and target filesystems are the same
|
||||
// If they are the same filesystem, we will try move the files back
|
||||
// because previously we moved them to the staging directory.
|
||||
if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
|
||||
for(Pair<byte[], String> el : famPaths) {
|
||||
Path hfileStagingPath = null;
|
||||
Path hfileOrigPath = new Path(el.getSecond());
|
||||
try {
|
||||
hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
|
||||
hfileOrigPath.getName());
|
||||
if(fs.rename(hfileStagingPath, hfileOrigPath)) {
|
||||
LOG.debug("Moved back file " + hfileOrigPath + " from " +
|
||||
hfileStagingPath);
|
||||
} else if(fs.exists(hfileStagingPath)){
|
||||
if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
|
||||
LOG.debug("Moved back file " + hfileOrigPath + " from " +
|
||||
hfileStagingPath);
|
||||
} else if(targetFs.exists(hfileStagingPath)){
|
||||
LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
|
||||
hfileStagingPath);
|
||||
}
|
||||
} catch(Exception ex) {
|
||||
LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
|
||||
hfileStagingPath);
|
||||
hfileStagingPath, ex);
|
||||
}
|
||||
} catch(Exception ex) {
|
||||
LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
|
||||
hfileStagingPath, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
|
@ -50,7 +51,9 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
|||
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||
import org.apache.hadoop.hbase.util.Methods;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -83,8 +86,8 @@ import java.util.List;
|
|||
* 2. A user writes out data to his secure output directory: /user/foo/data
|
||||
* 3. A call is made to hbase to create a secret staging directory
|
||||
* which globally rwx (777): /user/staging/averylongandrandomdirectoryname
|
||||
* 4. The user makes the data world readable and writable, then moves it
|
||||
* into the random staging directory, then calls bulkLoadHFiles()
|
||||
* 4. The user moves the data into the random staging directory,
|
||||
* then calls bulkLoadHFiles()
|
||||
*
|
||||
* Like delegation tokens the strength of the security lies in the length
|
||||
* and randomness of the secret directory.
|
||||
|
@ -220,6 +223,21 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
}
|
||||
boolean loaded = false;
|
||||
if (!bypass) {
|
||||
// Get the target fs (HBase region server fs) delegation token
|
||||
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
|
||||
// the 'request user' necessary token to operate on the target fs.
|
||||
// After this point the 'doAs' user will hold two tokens, one for the source fs
|
||||
// ('request user'), another for the target fs (HBase region server principal).
|
||||
FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
||||
try {
|
||||
targetfsDelegationToken.acquireDelegationToken(fs);
|
||||
} catch (IOException e) {
|
||||
ResponseConverter.setControllerException(controller, e);
|
||||
done.run(null);
|
||||
return;
|
||||
}
|
||||
ugi.addToken(targetfsDelegationToken.getUserToken());
|
||||
|
||||
loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
|
||||
@Override
|
||||
public Boolean run() {
|
||||
|
@ -228,9 +246,6 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
Configuration conf = env.getConfiguration();
|
||||
fs = FileSystem.get(conf);
|
||||
for(Pair<byte[], String> el: familyPaths) {
|
||||
Path p = new Path(el.getSecond());
|
||||
LOG.trace("Setting permission for: " + p);
|
||||
fs.setPermission(p, PERM_ALL_ACCESS);
|
||||
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
|
||||
if(!fs.exists(stageFamily)) {
|
||||
fs.mkdirs(stageFamily);
|
||||
|
@ -240,7 +255,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
//We call bulkLoadHFiles as requesting user
|
||||
//To enable access prior to staging
|
||||
return env.getRegion().bulkLoadHFiles(familyPaths, true,
|
||||
new SecureBulkLoadListener(fs, bulkToken));
|
||||
new SecureBulkLoadListener(fs, bulkToken, conf));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to complete bulk load", e);
|
||||
}
|
||||
|
@ -303,26 +318,42 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
}
|
||||
|
||||
private static class SecureBulkLoadListener implements HRegion.BulkLoadListener {
|
||||
// Target filesystem
|
||||
private FileSystem fs;
|
||||
private String stagingDir;
|
||||
private Configuration conf;
|
||||
// Source filesystem
|
||||
private FileSystem srcFs = null;
|
||||
|
||||
public SecureBulkLoadListener(FileSystem fs, String stagingDir) {
|
||||
public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
|
||||
this.fs = fs;
|
||||
this.stagingDir = stagingDir;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
|
||||
Path p = new Path(srcPath);
|
||||
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
|
||||
if (srcFs == null) {
|
||||
srcFs = FileSystem.get(p.toUri(), conf);
|
||||
}
|
||||
|
||||
if(!isFile(p)) {
|
||||
throw new IOException("Path does not reference a file: " + p);
|
||||
}
|
||||
|
||||
LOG.debug("Moving " + p + " to " + stageP);
|
||||
if(!fs.rename(p, stageP)) {
|
||||
throw new IOException("Failed to move HFile: " + p + " to " + stageP);
|
||||
// Check to see if the source and target filesystems are the same
|
||||
if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
|
||||
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
|
||||
"the destination filesystem. Copying file over to destination staging dir.");
|
||||
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
|
||||
}
|
||||
else {
|
||||
LOG.debug("Moving " + p + " to " + stageP);
|
||||
if(!fs.rename(p, stageP)) {
|
||||
throw new IOException("Failed to move HFile: " + p + " to " + stageP);
|
||||
}
|
||||
}
|
||||
return stageP.toString();
|
||||
}
|
||||
|
@ -350,7 +381,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
* @throws IOException
|
||||
*/
|
||||
private boolean isFile(Path p) throws IOException {
|
||||
FileStatus status = fs.getFileStatus(p);
|
||||
FileStatus status = srcFs.getFileStatus(p);
|
||||
boolean isFile = !status.isDir();
|
||||
try {
|
||||
isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
|
||||
|
|
Loading…
Reference in New Issue