HBASE-23330: Fix delegation token fetch with MasterRegistry
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
3e1450d8b3
commit
55cae10beb
|
@ -166,6 +166,11 @@ public interface Connection extends Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
Admin getAdmin() throws IOException;
|
Admin getAdmin() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the cluster ID unique to this HBase cluster.
|
||||||
|
*/
|
||||||
|
String getClusterId() throws IOException;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException;
|
public void close() throws IOException;
|
||||||
|
|
||||||
|
|
|
@ -501,4 +501,9 @@ abstract class ConnectionAdapter implements ClusterConnection {
|
||||||
public RpcControllerFactory getRpcControllerFactory() {
|
public RpcControllerFactory getRpcControllerFactory() {
|
||||||
return wrappedConnection.getRpcControllerFactory();
|
return wrappedConnection.getRpcControllerFactory();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getClusterId() throws IOException {
|
||||||
|
return wrappedConnection.getClusterId();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -824,6 +824,11 @@ class ConnectionManager {
|
||||||
return new HBaseAdmin(this);
|
return new HBaseAdmin(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getClusterId() throws IOException {
|
||||||
|
return registry.getClusterId();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MetricsConnection getConnectionMetrics() {
|
public MetricsConnection getConnectionMetrics() {
|
||||||
return this.metrics;
|
return this.metrics;
|
||||||
|
|
|
@ -305,7 +305,7 @@ public class TokenUtil {
|
||||||
public static void addTokenForJob(final Connection conn, final JobConf job, User user)
|
public static void addTokenForJob(final Connection conn, final JobConf job, User user)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
|
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn, user);
|
||||||
if (token == null) {
|
if (token == null) {
|
||||||
token = obtainToken(conn, user);
|
token = obtainToken(conn, user);
|
||||||
}
|
}
|
||||||
|
@ -324,7 +324,7 @@ public class TokenUtil {
|
||||||
*/
|
*/
|
||||||
public static void addTokenForJob(final Connection conn, User user, Job job)
|
public static void addTokenForJob(final Connection conn, User user, Job job)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
|
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn, user);
|
||||||
if (token == null) {
|
if (token == null) {
|
||||||
token = obtainToken(conn, user);
|
token = obtainToken(conn, user);
|
||||||
}
|
}
|
||||||
|
@ -343,7 +343,7 @@ public class TokenUtil {
|
||||||
*/
|
*/
|
||||||
public static boolean addTokenIfMissing(Connection conn, User user)
|
public static boolean addTokenIfMissing(Connection conn, User user)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
|
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn, user);
|
||||||
if (token == null) {
|
if (token == null) {
|
||||||
token = obtainToken(conn, user);
|
token = obtainToken(conn, user);
|
||||||
user.getUGI().addToken(token.getService(), token);
|
user.getUGI().addToken(token.getService(), token);
|
||||||
|
@ -356,19 +356,9 @@ public class TokenUtil {
|
||||||
* Get the authentication token of the user for the cluster specified in the configuration
|
* Get the authentication token of the user for the cluster specified in the configuration
|
||||||
* @return null if the user does not have the token, otherwise the auth token for the cluster.
|
* @return null if the user does not have the token, otherwise the auth token for the cluster.
|
||||||
*/
|
*/
|
||||||
private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
|
private static Token<AuthenticationTokenIdentifier> getAuthToken(Connection conn, User user)
|
||||||
throws IOException, InterruptedException {
|
throws IOException {
|
||||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
|
String clusterId = conn.getClusterId();
|
||||||
try {
|
return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
|
||||||
String clusterId = ZKClusterId.readClusterIdZNode(zkw);
|
|
||||||
if (clusterId == null) {
|
|
||||||
throw new IOException("Failed to get cluster ID");
|
|
||||||
}
|
|
||||||
return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
} finally {
|
|
||||||
zkw.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,6 +81,11 @@ public class TestMasterAddressRefresher {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getClusterId() throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue