From 49a3f5e4d10f9dc8ab5369e8279e06b34beb0195 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Fri, 11 Apr 2014 16:12:22 +0000 Subject: [PATCH] HBASE-10902 Make Secure Bulk Load work across remote secure clusters (Jerry He) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1586709 13f79535-47bb-0310-9956-ffa450edef68 --- .../mapreduce/LoadIncrementalHFiles.java | 46 +++++++++------- .../access/SecureBulkLoadEndpoint.java | 53 +++++++++++++++---- 2 files changed, 69 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 0c99451f61e..4fb84208a01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -113,6 +114,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private int maxFilesPerRegionPerFamily; private boolean assignSeqIds; + // Source filesystem + private FileSystem fs; + // Source delegation token private FsDelegationToken fsDelegationToken; private String bulkToken; private UserProvider userProvider; @@ -164,7 +168,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ private void discoverLoadQueue(Deque ret, Path hfofDir) throws IOException { - FileSystem fs = hfofDir.getFileSystem(getConf()); + fs = hfofDir.getFileSystem(getConf()); if (!fs.exists(hfofDir)) { throw new FileNotFoundException("HFileOutputFormat dir " + @@ -270,10 +274,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return; } - //If using secure bulk load + //If using secure bulk load, get source delegation token, and //prepare staging directory and token if (userProvider.isHBaseSecurityEnabled()) { - FileSystem fs = FileSystem.get(getConf()); + // fs is the source filesystem fsDelegationToken.acquireDelegationToken(fs); bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName()); @@ -332,7 +336,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.error(err); } } - + if (queue != null && !queue.isEmpty()) { throw new RuntimeException("Bulk load aborted with some files not yet loaded." + "Please check log for more details."); @@ -510,7 +514,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final Pair startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; - final FileSystem fs = hfilePath.getFileSystem(getConf()); HFile.Reader hfr = HFile.createReader(fs, hfilePath, new CacheConfig(getConf()), getConf()); final byte[] first, last; @@ -640,23 +643,28 @@ public class LoadIncrementalHFiles extends Configured implements Tool { //from the staging directory back to original location //in user directory if(secureClient != null && !success) { - FileSystem fs = FileSystem.get(getConf()); - for(Pair el : famPaths) { - Path hfileStagingPath = null; - Path hfileOrigPath = new Path(el.getSecond()); - try { - hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()), + FileSystem targetFs = FileSystem.get(getConf()); + // Check to see if the source and target filesystems are the same + // If they are the same filesystem, we will try move the files back + // because previously we moved them to the staging directory. + if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) { + for(Pair el : famPaths) { + Path hfileStagingPath = null; + Path hfileOrigPath = new Path(el.getSecond()); + try { + hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()), hfileOrigPath.getName()); - if(fs.rename(hfileStagingPath, hfileOrigPath)) { - LOG.debug("Moved back file " + hfileOrigPath + " from " + - hfileStagingPath); - } else if(fs.exists(hfileStagingPath)){ + if(targetFs.rename(hfileStagingPath, hfileOrigPath)) { + LOG.debug("Moved back file " + hfileOrigPath + " from " + + hfileStagingPath); + } else if(targetFs.exists(hfileStagingPath)){ + LOG.debug("Unable to move back file " + hfileOrigPath + " from " + + hfileStagingPath); + } + } catch(Exception ex) { LOG.debug("Unable to move back file " + hfileOrigPath + " from " + - hfileStagingPath); + hfileStagingPath, ex); } - } catch(Exception ex) { - LOG.debug("Unable to move back file " + hfileOrigPath + " from " + - hfileStagingPath, ex); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index 8b8c9d74615..7f845cc8ac9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Coprocessor; @@ -50,7 +51,9 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Text; @@ -83,8 +86,8 @@ import java.util.List; * 2. A user writes out data to his secure output directory: /user/foo/data * 3. A call is made to hbase to create a secret staging directory * which globally rwx (777): /user/staging/averylongandrandomdirectoryname - * 4. The user makes the data world readable and writable, then moves it - * into the random staging directory, then calls bulkLoadHFiles() + * 4. The user moves the data into the random staging directory, + * then calls bulkLoadHFiles() * * Like delegation tokens the strength of the security lies in the length * and randomness of the secret directory. @@ -220,6 +223,21 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } boolean loaded = false; if (!bypass) { + // Get the target fs (HBase region server fs) delegation token + // Since we have checked the permission via 'preBulkLoadHFile', now let's give + // the 'request user' necessary token to operate on the target fs. + // After this point the 'doAs' user will hold two tokens, one for the source fs + // ('request user'), another for the target fs (HBase region server principal). + FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer"); + try { + targetfsDelegationToken.acquireDelegationToken(fs); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + done.run(null); + return; + } + ugi.addToken(targetfsDelegationToken.getUserToken()); + loaded = ugi.doAs(new PrivilegedAction() { @Override public Boolean run() { @@ -228,9 +246,6 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService Configuration conf = env.getConfiguration(); fs = FileSystem.get(conf); for(Pair el: familyPaths) { - Path p = new Path(el.getSecond()); - LOG.trace("Setting permission for: " + p); - fs.setPermission(p, PERM_ALL_ACCESS); Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); if(!fs.exists(stageFamily)) { fs.mkdirs(stageFamily); @@ -240,7 +255,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService //We call bulkLoadHFiles as requesting user //To enable access prior to staging return env.getRegion().bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken)); + new SecureBulkLoadListener(fs, bulkToken, conf)); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } @@ -303,26 +318,42 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } private static class SecureBulkLoadListener implements HRegion.BulkLoadListener { + // Target filesystem private FileSystem fs; private String stagingDir; + private Configuration conf; + // Source filesystem + private FileSystem srcFs = null; - public SecureBulkLoadListener(FileSystem fs, String stagingDir) { + public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) { this.fs = fs; this.stagingDir = stagingDir; + this.conf = conf; } @Override public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); + if (srcFs == null) { + srcFs = FileSystem.get(p.toUri(), conf); + } if(!isFile(p)) { throw new IOException("Path does not reference a file: " + p); } - LOG.debug("Moving " + p + " to " + stageP); - if(!fs.rename(p, stageP)) { - throw new IOException("Failed to move HFile: " + p + " to " + stageP); + // Check to see if the source and target filesystems are the same + if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { + LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + + "the destination filesystem. Copying file over to destination staging dir."); + FileUtil.copy(srcFs, p, fs, stageP, false, conf); + } + else { + LOG.debug("Moving " + p + " to " + stageP); + if(!fs.rename(p, stageP)) { + throw new IOException("Failed to move HFile: " + p + " to " + stageP); + } } return stageP.toString(); } @@ -350,7 +381,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService * @throws IOException */ private boolean isFile(Path p) throws IOException { - FileStatus status = fs.getFileStatus(p); + FileStatus status = srcFs.getFileStatus(p); boolean isFile = !status.isDir(); try { isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);