diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3c9d54f5676..525996138c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -503,7 +503,7 @@ public class HRegionServer extends HasThread implements private volatile ThroughputController flushThroughputController; - protected final SecureBulkLoadManager secureBulkLoadManager; + protected SecureBulkLoadManager secureBulkLoadManager; /** * Starts a HRegionServer at the default location. @@ -615,9 +615,6 @@ public class HRegionServer extends HasThread implements } this.configurationManager = new ConfigurationManager(); - this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf); - this.secureBulkLoadManager.start(); - rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); @@ -785,6 +782,9 @@ public class HRegionServer extends HasThread implements try { setupClusterConnection(); + this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); + this.secureBulkLoadManager.start(); + // Health checker thread. if (isHealthCheckerConfigured()) { int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 7a18cbb7edb..1accae187bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.FsDelegationToken; +import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; @@ -108,9 +110,11 @@ public class SecureBulkLoadManager { private Path baseStagingDir; private UserProvider userProvider; + private Connection conn; - SecureBulkLoadManager(Configuration conf) { + SecureBulkLoadManager(Configuration conf, Connection conn) { this.conf = conf; + this.conn = conn; } public void start() throws IOException { @@ -187,7 +191,18 @@ public class SecureBulkLoadManager { final String bulkToken = request.getBulkToken(); User user = getActiveUser(); final UserGroupInformation ugi = user.getUGI(); - if(userToken != null) { + if (userProvider.isHadoopSecurityEnabled()) { + try { + Token tok = TokenUtil.obtainToken(conn); + if (tok != null) { + boolean b = ugi.addToken(tok); + LOG.debug("token added " + tok + " for user " + ugi + " return=" + b); + } + } catch (IOException ioe) { + LOG.warn("unable to add token", ioe); + } + } + if (userToken != null) { ugi.addToken(userToken); } else if (userProvider.isHadoopSecurityEnabled()) { //we allow this to pass through in "simple" security mode