HBASE-12052: BulkLoad Failed due to no write permission on input files
This commit is contained in:
parent
3beb168b4f
commit
8ee39f1971
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.client.RegionServerCallable;
|
|||||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
|
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.io.HFileLink;
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
||||||
import org.apache.hadoop.hbase.io.Reference;
|
import org.apache.hadoop.hbase.io.Reference;
|
||||||
@ -288,10 +289,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||||||
|
|
||||||
//If using secure bulk load, get source delegation token, and
|
//If using secure bulk load, get source delegation token, and
|
||||||
//prepare staging directory and token
|
//prepare staging directory and token
|
||||||
if (userProvider.isHBaseSecurityEnabled()) {
|
|
||||||
// fs is the source filesystem
|
// fs is the source filesystem
|
||||||
fsDelegationToken.acquireDelegationToken(fs);
|
fsDelegationToken.acquireDelegationToken(fs);
|
||||||
|
if(isSecureBulkLoadEndpointAvailable()) {
|
||||||
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
|
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -329,13 +329,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
if (userProvider.isHBaseSecurityEnabled()) {
|
|
||||||
fsDelegationToken.releaseDelegationToken();
|
fsDelegationToken.releaseDelegationToken();
|
||||||
|
|
||||||
if(bulkToken != null) {
|
if(bulkToken != null) {
|
||||||
new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
|
new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
pool.shutdown();
|
pool.shutdown();
|
||||||
if (queue != null && !queue.isEmpty()) {
|
if (queue != null && !queue.isEmpty()) {
|
||||||
StringBuilder err = new StringBuilder();
|
StringBuilder err = new StringBuilder();
|
||||||
@ -644,7 +641,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||||||
LOG.debug("Going to connect to server " + getLocation() + " for row "
|
LOG.debug("Going to connect to server " + getLocation() + " for row "
|
||||||
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
|
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
|
||||||
byte[] regionName = getLocation().getRegionInfo().getRegionName();
|
byte[] regionName = getLocation().getRegionInfo().getRegionName();
|
||||||
if(!userProvider.isHBaseSecurityEnabled()) {
|
if (!isSecureBulkLoadEndpointAvailable()) {
|
||||||
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
|
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
|
||||||
} else {
|
} else {
|
||||||
Table table = new HTable(conn.getConfiguration(), getTableName());
|
Table table = new HTable(conn.getConfiguration(), getTableName());
|
||||||
@ -708,6 +705,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isSecureBulkLoadEndpointAvailable() {
|
||||||
|
String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
|
||||||
|
return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Split a storefile into a top and bottom half, maintaining
|
* Split a storefile into a top and bottom half, maintaining
|
||||||
* the metadata, recreating bloom filters, etc.
|
* the metadata, recreating bloom filters, etc.
|
||||||
|
@ -156,7 +156,9 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||||||
PrepareBulkLoadRequest request,
|
PrepareBulkLoadRequest request,
|
||||||
RpcCallback<PrepareBulkLoadResponse> done){
|
RpcCallback<PrepareBulkLoadResponse> done){
|
||||||
try {
|
try {
|
||||||
|
if(userProvider.isHBaseSecurityEnabled()) {
|
||||||
getAccessController().prePrepareBulkLoad(env);
|
getAccessController().prePrepareBulkLoad(env);
|
||||||
|
}
|
||||||
String bulkToken = createStagingDir(baseStagingDir,
|
String bulkToken = createStagingDir(baseStagingDir,
|
||||||
getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
|
getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
|
||||||
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
|
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
|
||||||
@ -171,7 +173,9 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||||||
CleanupBulkLoadRequest request,
|
CleanupBulkLoadRequest request,
|
||||||
RpcCallback<CleanupBulkLoadResponse> done) {
|
RpcCallback<CleanupBulkLoadResponse> done) {
|
||||||
try {
|
try {
|
||||||
|
if (userProvider.isHBaseSecurityEnabled()) {
|
||||||
getAccessController().preCleanupBulkLoad(env);
|
getAccessController().preCleanupBulkLoad(env);
|
||||||
|
}
|
||||||
fs.delete(createStagingDir(baseStagingDir,
|
fs.delete(createStagingDir(baseStagingDir,
|
||||||
getActiveUser(),
|
getActiveUser(),
|
||||||
new Path(request.getBulkToken()).getName()),
|
new Path(request.getBulkToken()).getName()),
|
||||||
@ -191,11 +195,13 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||||||
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
|
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
|
||||||
familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
|
familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
|
||||||
}
|
}
|
||||||
final Token userToken =
|
|
||||||
new Token(request.getFsToken().getIdentifier().toByteArray(),
|
Token userToken = null;
|
||||||
request.getFsToken().getPassword().toByteArray(),
|
if (request.getFsToken().hasIdentifier() && request.getFsToken().hasPassword()) {
|
||||||
new Text(request.getFsToken().getKind()),
|
userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
|
||||||
new Text(request.getFsToken().getService()));
|
.getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
|
||||||
|
request.getFsToken().getService()));
|
||||||
|
}
|
||||||
final String bulkToken = request.getBulkToken();
|
final String bulkToken = request.getBulkToken();
|
||||||
User user = getActiveUser();
|
User user = getActiveUser();
|
||||||
final UserGroupInformation ugi = user.getUGI();
|
final UserGroupInformation ugi = user.getUGI();
|
||||||
@ -227,6 +233,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||||||
// the 'request user' necessary token to operate on the target fs.
|
// the 'request user' necessary token to operate on the target fs.
|
||||||
// After this point the 'doAs' user will hold two tokens, one for the source fs
|
// After this point the 'doAs' user will hold two tokens, one for the source fs
|
||||||
// ('request user'), another for the target fs (HBase region server principal).
|
// ('request user'), another for the target fs (HBase region server principal).
|
||||||
|
if (userProvider.isHadoopSecurityEnabled()) {
|
||||||
FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
||||||
try {
|
try {
|
||||||
targetfsDelegationToken.acquireDelegationToken(fs);
|
targetfsDelegationToken.acquireDelegationToken(fs);
|
||||||
@ -236,10 +243,11 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
|
Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
|
||||||
if (targetFsToken != null && (userToken == null
|
if (targetFsToken != null
|
||||||
|| !targetFsToken.getService().equals(userToken.getService()))) {
|
&& (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
|
||||||
ugi.addToken(targetFsToken);
|
ugi.addToken(targetFsToken);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
|
loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
@ -312,7 +320,8 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||||||
}
|
}
|
||||||
|
|
||||||
//this is for testing
|
//this is for testing
|
||||||
if("simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
|
if (userProvider.isHadoopSecurityEnabled()
|
||||||
|
&& "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
|
||||||
return User.createUserForTesting(conf, user.getShortName(), new String[]{});
|
return User.createUserForTesting(conf, user.getShortName(), new String[]{});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +85,7 @@ public class FsDelegationToken {
|
|||||||
* Releases a previously acquired delegation token.
|
* Releases a previously acquired delegation token.
|
||||||
*/
|
*/
|
||||||
public void releaseDelegationToken() {
|
public void releaseDelegationToken() {
|
||||||
if (userProvider.isHBaseSecurityEnabled()) {
|
if (userProvider.isHadoopSecurityEnabled()) {
|
||||||
if (userToken != null && !hasForwardedToken) {
|
if (userToken != null && !hasForwardedToken) {
|
||||||
try {
|
try {
|
||||||
userToken.cancel(this.fs.getConf());
|
userToken.cancel(this.fs.getConf());
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(LargeTests.class)
|
||||||
|
public class TestLoadIncrementalHFilesUseSecurityEndPoint extends TestLoadIncrementalHFiles {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
|
||||||
|
MAX_FILES_PER_REGION_PER_FAMILY);
|
||||||
|
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
"org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
|
||||||
|
util.startMiniCluster();
|
||||||
|
setupNamespace();
|
||||||
|
}
|
||||||
|
}
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.client.Result;
|
|||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
||||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
@ -169,6 +170,9 @@ public class TestAccessController extends SecureTestUtil {
|
|||||||
"org.apache.hadoop.hbase.master.snapshot.SnapshotLogCleaner");
|
"org.apache.hadoop.hbase.master.snapshot.SnapshotLogCleaner");
|
||||||
// Enable security
|
// Enable security
|
||||||
enableSecurity(conf);
|
enableSecurity(conf);
|
||||||
|
// In this particular test case, we can't use SecureBulkLoadEndpoint because its doAs will fail
|
||||||
|
// to move a file for a random user
|
||||||
|
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName());
|
||||||
// Verify enableSecurity sets up what we require
|
// Verify enableSecurity sets up what we require
|
||||||
verifyConfiguration(conf);
|
verifyConfiguration(conf);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user