HBASE-17435 Call to preCommitStoreFile() hook encounters SaslException in secure deployment
This commit is contained in:
parent
f65a439f01
commit
9cbeba6c3d
|
@ -503,7 +503,7 @@ public class HRegionServer extends HasThread implements
|
|||
|
||||
private volatile ThroughputController flushThroughputController;
|
||||
|
||||
protected final SecureBulkLoadManager secureBulkLoadManager;
|
||||
protected SecureBulkLoadManager secureBulkLoadManager;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location.
|
||||
|
@ -615,9 +615,6 @@ public class HRegionServer extends HasThread implements
|
|||
}
|
||||
this.configurationManager = new ConfigurationManager();
|
||||
|
||||
this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf);
|
||||
this.secureBulkLoadManager.start();
|
||||
|
||||
rpcServices.start();
|
||||
putUpWebUI();
|
||||
this.walRoller = new LogRoller(this, this);
|
||||
|
@ -785,6 +782,9 @@ public class HRegionServer extends HasThread implements
|
|||
try {
|
||||
setupClusterConnection();
|
||||
|
||||
this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
|
||||
this.secureBulkLoadManager.start();
|
||||
|
||||
// Health checker thread.
|
||||
if (isHealthCheckerConfigured()) {
|
||||
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
|
@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
|
|||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||
import org.apache.hadoop.hbase.security.token.TokenUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -108,9 +110,11 @@ public class SecureBulkLoadManager {
|
|||
private Path baseStagingDir;
|
||||
|
||||
private UserProvider userProvider;
|
||||
private Connection conn;
|
||||
|
||||
SecureBulkLoadManager(Configuration conf) {
|
||||
SecureBulkLoadManager(Configuration conf, Connection conn) {
|
||||
this.conf = conf;
|
||||
this.conn = conn;
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
|
@ -187,7 +191,18 @@ public class SecureBulkLoadManager {
|
|||
final String bulkToken = request.getBulkToken();
|
||||
User user = getActiveUser();
|
||||
final UserGroupInformation ugi = user.getUGI();
|
||||
if(userToken != null) {
|
||||
if (userProvider.isHadoopSecurityEnabled()) {
|
||||
try {
|
||||
Token tok = TokenUtil.obtainToken(conn);
|
||||
if (tok != null) {
|
||||
boolean b = ugi.addToken(tok);
|
||||
LOG.debug("token added " + tok + " for user " + ugi + " return=" + b);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("unable to add token", ioe);
|
||||
}
|
||||
}
|
||||
if (userToken != null) {
|
||||
ugi.addToken(userToken);
|
||||
} else if (userProvider.isHadoopSecurityEnabled()) {
|
||||
//we allow this to pass through in "simple" security mode
|
||||
|
|
Loading…
Reference in New Issue