HBASE-21516 Use AsyncConnection instead of Connection in SecureBulkLoadManager
This commit is contained in:
parent
30c131b5f7
commit
2446f0026b
|
@ -261,13 +261,12 @@ public final class ProtobufUtil {
|
|||
* just {@link ServiceException}. Prefer this method to
|
||||
* {@link #getRemoteException(ServiceException)} because trying to
|
||||
* contain direct protobuf references.
|
||||
* @param e
|
||||
*/
|
||||
public static IOException handleRemoteException(Exception e) {
|
||||
public static IOException handleRemoteException(Throwable e) {
|
||||
return makeIOExceptionOfException(e);
|
||||
}
|
||||
|
||||
private static IOException makeIOExceptionOfException(Exception e) {
|
||||
private static IOException makeIOExceptionOfException(Throwable e) {
|
||||
Throwable t = e;
|
||||
if (e instanceof ServiceException ||
|
||||
e instanceof org.apache.hbase.thirdparty.com.google.protobuf.ServiceException) {
|
||||
|
|
|
@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.function.Function;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
|
||||
|
@ -124,6 +123,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
|
|||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
|
||||
|
@ -345,13 +345,12 @@ public final class ProtobufUtil {
|
|||
* just {@link ServiceException}. Prefer this method to
|
||||
* {@link #getRemoteException(ServiceException)} because trying to
|
||||
* contain direct protobuf references.
|
||||
* @param e
|
||||
*/
|
||||
public static IOException handleRemoteException(Exception e) {
|
||||
public static IOException handleRemoteException(Throwable e) {
|
||||
return makeIOExceptionOfException(e);
|
||||
}
|
||||
|
||||
private static IOException makeIOExceptionOfException(Exception e) {
|
||||
private static IOException makeIOExceptionOfException(Throwable e) {
|
||||
Throwable t = e;
|
||||
if (e instanceof ServiceException) {
|
||||
t = e.getCause();
|
||||
|
|
|
@ -1948,7 +1948,7 @@ public class HRegionServer extends HasThread implements
|
|||
if (!isStopped() && !isAborted()) {
|
||||
initializeThreads();
|
||||
}
|
||||
this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
|
||||
this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
|
||||
this.secureBulkLoadManager.start();
|
||||
|
||||
// Health checker thread.
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -27,7 +27,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.commons.lang3.mutable.MutableInt;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -38,11 +37,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||
import org.apache.hadoop.hbase.security.token.TokenUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -56,7 +56,9 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
|
||||
|
@ -111,9 +113,9 @@ public class SecureBulkLoadManager {
|
|||
|
||||
private UserProvider userProvider;
|
||||
private ConcurrentHashMap<UserGroupInformation, MutableInt> ugiReferenceCounter;
|
||||
private Connection conn;
|
||||
private AsyncConnection conn;
|
||||
|
||||
SecureBulkLoadManager(Configuration conf, Connection conn) {
|
||||
SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
|
||||
this.conf = conf;
|
||||
this.conn = conn;
|
||||
}
|
||||
|
@ -218,23 +220,23 @@ public class SecureBulkLoadManager {
|
|||
familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
|
||||
}
|
||||
|
||||
Token userToken = null;
|
||||
Token<AuthenticationTokenIdentifier> userToken = null;
|
||||
if (userProvider.isHadoopSecurityEnabled()) {
|
||||
userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
|
||||
.getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
|
||||
request.getFsToken().getService()));
|
||||
userToken = new Token<>(request.getFsToken().getIdentifier().toByteArray(),
|
||||
request.getFsToken().getPassword().toByteArray(), new Text(request.getFsToken().getKind()),
|
||||
new Text(request.getFsToken().getService()));
|
||||
}
|
||||
final String bulkToken = request.getBulkToken();
|
||||
User user = getActiveUser();
|
||||
final UserGroupInformation ugi = user.getUGI();
|
||||
if (userProvider.isHadoopSecurityEnabled()) {
|
||||
try {
|
||||
Token tok = TokenUtil.obtainToken(conn);
|
||||
Token<AuthenticationTokenIdentifier> tok = TokenUtil.obtainToken(conn).get();
|
||||
if (tok != null) {
|
||||
boolean b = ugi.addToken(tok);
|
||||
LOG.debug("token added " + tok + " for user " + ugi + " return=" + b);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
} catch (Exception ioe) {
|
||||
LOG.warn("unable to add token", ioe);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -15,27 +15,29 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.security.token;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncTable;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.AuthenticationService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenResponse;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
|
@ -45,6 +47,8 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
/**
|
||||
* Utility methods for obtaining authentication tokens.
|
||||
*/
|
||||
|
@ -62,14 +66,41 @@ public class TokenUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain and return an authentication token for the current user.
|
||||
* @param conn The async HBase cluster connection
|
||||
* @return the authentication token instance, wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
public static CompletableFuture<Token<AuthenticationTokenIdentifier>> obtainToken(
|
||||
AsyncConnection conn) {
|
||||
CompletableFuture<Token<AuthenticationTokenIdentifier>> future = new CompletableFuture<>();
|
||||
if (injectedException != null) {
|
||||
future.completeExceptionally(injectedException);
|
||||
return future;
|
||||
}
|
||||
AsyncTable<?> table = conn.getTable(TableName.META_TABLE_NAME);
|
||||
table.<AuthenticationService.Interface, GetAuthenticationTokenResponse> coprocessorService(
|
||||
AuthenticationProtos.AuthenticationService::newStub,
|
||||
(s, c, r) -> s.getAuthenticationToken(c,
|
||||
AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance(), r),
|
||||
HConstants.EMPTY_START_ROW).whenComplete((resp, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(ProtobufUtil.handleRemoteException(error));
|
||||
} else {
|
||||
future.complete(toToken(resp.getToken()));
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain and return an authentication token for the current user.
|
||||
* @param conn The HBase cluster connection
|
||||
* @throws IOException if a remote error or serialization problem occurs.
|
||||
* @return the authentication token instance
|
||||
*/
|
||||
public static Token<AuthenticationTokenIdentifier> obtainToken(
|
||||
Connection conn) throws IOException {
|
||||
public static Token<AuthenticationTokenIdentifier> obtainToken(Connection conn)
|
||||
throws IOException {
|
||||
Table meta = null;
|
||||
try {
|
||||
injectFault();
|
||||
|
@ -77,9 +108,9 @@ public class TokenUtil {
|
|||
meta = conn.getTable(TableName.META_TABLE_NAME);
|
||||
CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
|
||||
AuthenticationProtos.AuthenticationService.BlockingInterface service =
|
||||
AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
|
||||
AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
|
||||
AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
|
||||
AuthenticationService.newBlockingStub(rpcChannel);
|
||||
GetAuthenticationTokenResponse response =
|
||||
service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance());
|
||||
|
||||
return toToken(response.getToken());
|
||||
} catch (ServiceException se) {
|
||||
|
|
|
@ -18,35 +18,53 @@
|
|||
package org.apache.hadoop.hbase.security.token;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestTokenUtil {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestTokenUtil.class);
|
||||
HBaseClassTestRule.forClass(TestTokenUtil.class);
|
||||
|
||||
private URLClassLoader cl;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
URL urlPU = ProtobufUtil.class.getProtectionDomain().getCodeSource().getLocation();
|
||||
URL urlTU = TokenUtil.class.getProtectionDomain().getCodeSource().getLocation();
|
||||
cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
Closeables.close(cl, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObtainToken() throws Exception {
|
||||
URL urlPU = ProtobufUtil.class.getProtectionDomain().getCodeSource().getLocation();
|
||||
URL urlTU = TokenUtil.class.getProtectionDomain().getCodeSource().getLocation();
|
||||
|
||||
ClassLoader cl = new URLClassLoader(new URL[] { urlPU, urlTU }, getClass().getClassLoader());
|
||||
|
||||
Throwable injected = new com.google.protobuf.ServiceException("injected");
|
||||
|
||||
Class<?> tokenUtil = cl.loadClass(TokenUtil.class.getCanonicalName());
|
||||
|
@ -55,8 +73,7 @@ public class TestTokenUtil {
|
|||
shouldInjectFault.set(null, injected);
|
||||
|
||||
try {
|
||||
tokenUtil.getMethod("obtainToken", Connection.class)
|
||||
.invoke(null, new Object[] { null });
|
||||
tokenUtil.getMethod("obtainToken", Connection.class).invoke(null, new Object[] { null });
|
||||
fail("Should have injected exception.");
|
||||
} catch (InvocationTargetException e) {
|
||||
Throwable t = e;
|
||||
|
@ -72,9 +89,16 @@ public class TestTokenUtil {
|
|||
}
|
||||
}
|
||||
|
||||
CompletableFuture<?> future = (CompletableFuture<?>) tokenUtil
|
||||
.getMethod("obtainToken", AsyncConnection.class).invoke(null, new Object[] { null });
|
||||
try {
|
||||
future.get();
|
||||
fail("Should have injected exception.");
|
||||
} catch (ExecutionException e) {
|
||||
assertSame(injected, e.getCause());
|
||||
}
|
||||
Boolean loaded = (Boolean) cl.loadClass(ProtobufUtil.class.getCanonicalName())
|
||||
.getDeclaredMethod("isClassLoaderLoaded")
|
||||
.invoke(null);
|
||||
.getDeclaredMethod("isClassLoaderLoaded").invoke(null);
|
||||
assertFalse("Should not have loaded DynamicClassLoader", loaded);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue