diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 39197350f87..612b87ef0fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; @@ -288,10 +289,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { //If using secure bulk load, get source delegation token, and //prepare staging directory and token - if (userProvider.isHBaseSecurityEnabled()) { - // fs is the source filesystem - fsDelegationToken.acquireDelegationToken(fs); - + // fs is the source filesystem + fsDelegationToken.acquireDelegationToken(fs); + if(isSecureBulkLoadEndpointAvailable()) { bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName()); } @@ -329,12 +329,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } finally { - if (userProvider.isHBaseSecurityEnabled()) { - fsDelegationToken.releaseDelegationToken(); - - if(bulkToken != null) { - new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken); - } + fsDelegationToken.releaseDelegationToken(); + if(bulkToken != null) { + new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken); } pool.shutdown(); if (queue != null && !queue.isEmpty()) { @@ -644,7 +641,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); byte[] regionName = getLocation().getRegionInfo().getRegionName(); - if(!userProvider.isHBaseSecurityEnabled()) { + if (!isSecureBulkLoadEndpointAvailable()) { success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); } else { Table table = new HTable(conn.getConfiguration(), getTableName()); @@ -707,6 +704,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw e; } } + + private boolean isSecureBulkLoadEndpointAvailable() { + String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + } /** * Split a storefile into a top and bottom half, maintaining diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index b7b8e7ba6ab..2f04799afd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -156,7 +156,9 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService PrepareBulkLoadRequest request, RpcCallback done){ try { - getAccessController().prePrepareBulkLoad(env); + if(userProvider.isHBaseSecurityEnabled()) { + getAccessController().prePrepareBulkLoad(env); + } String bulkToken = createStagingDir(baseStagingDir, getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString(); done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build()); @@ -171,7 +173,9 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService CleanupBulkLoadRequest request, RpcCallback done) { try { - getAccessController().preCleanupBulkLoad(env); + if (userProvider.isHBaseSecurityEnabled()) { + getAccessController().preCleanupBulkLoad(env); + } fs.delete(createStagingDir(baseStagingDir, getActiveUser(), new Path(request.getBulkToken()).getName()), @@ -191,11 +195,13 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath())); } - final Token userToken = - new Token(request.getFsToken().getIdentifier().toByteArray(), - request.getFsToken().getPassword().toByteArray(), - new Text(request.getFsToken().getKind()), - new Text(request.getFsToken().getService())); + + Token userToken = null; + if (request.getFsToken().hasIdentifier() && request.getFsToken().hasPassword()) { + userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken() + .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text( + request.getFsToken().getService())); + } final String bulkToken = request.getBulkToken(); User user = getActiveUser(); final UserGroupInformation ugi = user.getUGI(); @@ -227,18 +233,20 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService // the 'request user' necessary token to operate on the target fs. // After this point the 'doAs' user will hold two tokens, one for the source fs // ('request user'), another for the target fs (HBase region server principal). - FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer"); - try { - targetfsDelegationToken.acquireDelegationToken(fs); - } catch (IOException e) { - ResponseConverter.setControllerException(controller, e); - done.run(null); - return; - } - Token targetFsToken = targetfsDelegationToken.getUserToken(); - if (targetFsToken != null && (userToken == null - || !targetFsToken.getService().equals(userToken.getService()))) { - ugi.addToken(targetFsToken); + if (userProvider.isHadoopSecurityEnabled()) { + FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer"); + try { + targetfsDelegationToken.acquireDelegationToken(fs); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + done.run(null); + return; + } + Token targetFsToken = targetfsDelegationToken.getUserToken(); + if (targetFsToken != null + && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) { + ugi.addToken(targetFsToken); + } } loaded = ugi.doAs(new PrivilegedAction() { @@ -252,7 +260,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService Path p = new Path(el.getSecond()); LOG.trace("Setting permission for: " + p); fs.setPermission(p, PERM_ALL_ACCESS); - + Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); if(!fs.exists(stageFamily)) { fs.mkdirs(stageFamily); @@ -312,7 +320,8 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } //this is for testing - if("simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) { + if (userProvider.isHadoopSecurityEnabled() + && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) { return User.createUserForTesting(conf, user.getShortName(), new String[]{}); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java index 5faf91f25ce..6ac1437c4ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java @@ -85,7 +85,7 @@ public class FsDelegationToken { * Releases a previously acquired delegation token. */ public void releaseDelegationToken() { - if (userProvider.isHBaseSecurityEnabled()) { + if (userProvider.isHadoopSecurityEnabled()) { if (userToken != null && !hasForwardedToken) { try { userToken.cancel(this.fs.getConf()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java new file mode 100644 index 00000000000..6a916f6daf5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestLoadIncrementalHFilesUseSecurityEndPoint extends TestLoadIncrementalHFiles { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, + MAX_FILES_PER_REGION_PER_FAMILY); + util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + util.startMiniCluster(); + setupNamespace(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index d22292d728e..f94e6a33611 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -169,6 +170,9 @@ public class TestAccessController extends SecureTestUtil { "org.apache.hadoop.hbase.master.snapshot.SnapshotLogCleaner"); // Enable security enableSecurity(conf); + // In this particular test case, we can't use SecureBulkLoadEndpoint because its doAs will fail + // to move a file for a random user + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName()); // Verify enableSecurity sets up what we require verifyConfiguration(conf);