HBASE-7482 Port HBASE-7442 HBase remote CopyTable not working when security enabled to trunk

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1455764 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2013-03-13 00:38:19 +00:00
parent 39a02611b5
commit d45cbcac1a
14 changed files with 67 additions and 52 deletions

View File

@ -615,7 +615,7 @@ public class HConnectionManager {
// ProtobufRpcClientEngine is the main RpcClientEngine implementation, // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
// but we maintain access through an interface to allow overriding for tests // but we maintain access through an interface to allow overriding for tests
// RPC engine setup must follow obtaining the cluster ID for token authentication to work // RPC engine setup must follow obtaining the cluster ID for token authentication to work
this.rpcEngine = new ProtobufRpcClientEngine(this.conf); this.rpcEngine = new ProtobufRpcClientEngine(this.conf, this.clusterId);
} }
/** /**
@ -627,41 +627,35 @@ public class HConnectionManager {
} }
private String clusterId = null; private String clusterId = null;
public final void retrieveClusterId(){ public final void retrieveClusterId(){
if (conf.get(HConstants.CLUSTER_ID) != null){ if (clusterId != null) {
return; return;
} }
// No synchronized here, worse case we will retrieve it twice, that's // No synchronized here, worse case we will retrieve it twice, that's
// not an issue. // not an issue.
if (this.clusterId == null){ ZooKeeperKeepAliveConnection zkw = null;
this.clusterId = conf.get(HConstants.CLUSTER_ID); try {
if (this.clusterId == null) { zkw = getKeepAliveZooKeeperWatcher();
ZooKeeperKeepAliveConnection zkw = null; clusterId = ZKClusterId.readClusterIdZNode(zkw);
try { if (clusterId == null) {
zkw = getKeepAliveZooKeeperWatcher(); LOG.info("ClusterId read in ZooKeeper is null");
this.clusterId = ZKClusterId.readClusterIdZNode(zkw); }
if (clusterId == null) { } catch (KeeperException e) {
LOG.info("ClusterId read in ZooKeeper is null"); LOG.warn("Can't retrieve clusterId from Zookeeper", e);
} } catch (IOException e) {
} catch (KeeperException e) { LOG.warn("Can't retrieve clusterId from Zookeeper", e);
LOG.warn("Can't retrieve clusterId from Zookeeper", e); } finally {
} catch (IOException e) { if (zkw != null) {
LOG.warn("Can't retrieve clusterId from Zookeeper", e); zkw.close();
} finally {
if (zkw != null) {
zkw.close();
}
}
if (this.clusterId == null) {
this.clusterId = "default";
}
LOG.info("ClusterId is " + clusterId);
} }
} }
if (clusterId == null) {
clusterId = HConstants.CLUSTER_ID_DEFAULT;
}
conf.set(HConstants.CLUSTER_ID, clusterId); LOG.info("ClusterId is " + clusterId);
} }
@Override @Override

View File

@ -1156,7 +1156,7 @@ public class HBaseClient {
* @param conf configuration * @param conf configuration
* @param factory socket factory * @param factory socket factory
*/ */
public HBaseClient(Configuration conf, SocketFactory factory) { public HBaseClient(Configuration conf, String clusterId, SocketFactory factory) {
this.maxIdleTime = this.maxIdleTime =
conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
@ -1169,7 +1169,7 @@ public class HBaseClient {
} }
this.conf = conf; this.conf = conf;
this.socketFactory = factory; this.socketFactory = factory;
this.clusterId = conf.get(HConstants.CLUSTER_ID, "default"); this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
this.connections = new PoolMap<ConnectionId, Connection>( this.connections = new PoolMap<ConnectionId, Connection>(
getPoolType(conf), getPoolSize(conf)); getPoolType(conf), getPoolSize(conf));
this.failedServers = new FailedServers(conf); this.failedServers = new FailedServers(conf);
@ -1179,8 +1179,8 @@ public class HBaseClient {
* Construct an IPC client with the default SocketFactory * Construct an IPC client with the default SocketFactory
* @param conf configuration * @param conf configuration
*/ */
public HBaseClient(Configuration conf) { public HBaseClient(Configuration conf, String clusterId) {
this(conf, NetUtils.getDefaultSocketFactory(conf)); this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf));
} }
/** /**

View File

@ -44,8 +44,8 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
protected HBaseClient client; protected HBaseClient client;
public ProtobufRpcClientEngine(Configuration conf) { public ProtobufRpcClientEngine(Configuration conf, String clusterId) {
this.client = new HBaseClient(conf); this.client = new HBaseClient(conf, clusterId);
} }
@Override @Override

View File

@ -327,8 +327,8 @@ public final class HConstants {
/** name of the file for unique cluster ID */ /** name of the file for unique cluster ID */
public static final String CLUSTER_ID_FILE_NAME = "hbase.id"; public static final String CLUSTER_ID_FILE_NAME = "hbase.id";
/** Configuration key storing the cluster ID */ /** Default value for cluster ID */
public static final String CLUSTER_ID = "hbase.cluster.id"; public static final String CLUSTER_ID_DEFAULT = "default-cluster";
// Always store the location of the root table's HRegion. // Always store the location of the root table's HRegion.
// This HRegion is never split. // This HRegion is never split.

View File

@ -290,6 +290,13 @@ public class TableMapReduceUtil {
public static void initCredentials(Job job) throws IOException { public static void initCredentials(Job job) throws IOException {
if (User.isHBaseSecurityEnabled(job.getConfiguration())) { if (User.isHBaseSecurityEnabled(job.getConfiguration())) {
try { try {
// init credentials for remote cluster
String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
if (quorumAddress != null) {
Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
User.getCurrent().obtainAuthTokenForJob(peerConf, job);
}
User.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job); User.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.info("Interrupted obtaining user authentication token"); LOG.info("Interrupted obtaining user authentication token");

View File

@ -182,14 +182,17 @@ implements Configurable {
@Override @Override
public void setConf(Configuration otherConf) { public void setConf(Configuration otherConf) {
this.conf = HBaseConfiguration.create(otherConf); this.conf = HBaseConfiguration.create(otherConf);
String tableName = this.conf.get(OUTPUT_TABLE); String tableName = this.conf.get(OUTPUT_TABLE);
if(tableName == null || tableName.length() <= 0) { if(tableName == null || tableName.length() <= 0) {
throw new IllegalArgumentException("Must specify table name"); throw new IllegalArgumentException("Must specify table name");
} }
String address = this.conf.get(QUORUM_ADDRESS); String address = this.conf.get(QUORUM_ADDRESS);
int zkClientPort = conf.getInt(QUORUM_PORT, 0); int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
String serverClass = this.conf.get(REGION_SERVER_CLASS); String serverClass = this.conf.get(REGION_SERVER_CLASS);
String serverImpl = this.conf.get(REGION_SERVER_IMPL); String serverImpl = this.conf.get(REGION_SERVER_IMPL);
try { try {
if (address != null) { if (address != null) {
ZKUtil.applyClusterKeyToConf(this.conf, address); ZKUtil.applyClusterKeyToConf(this.conf, address);
@ -198,7 +201,7 @@ implements Configurable {
this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
} }
if (zkClientPort != 0) { if (zkClientPort != 0) {
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
} }
this.table = new HTable(this.conf, tableName); this.table = new HTable(this.conf, tableName);
this.table.setAutoFlush(false); this.table.setAutoFlush(false);

View File

@ -405,6 +405,11 @@ public class HRegionServer implements ClientProtocol,
*/ */
private final long startcode; private final long startcode;
/**
* Unique identifier for the cluster we are a part of.
*/
private String clusterId;
/** /**
* MX Bean for RegionServerInfo * MX Bean for RegionServerInfo
*/ */
@ -540,7 +545,7 @@ public class HRegionServer implements ClientProtocol,
} }
String getClusterId() { String getClusterId() {
return this.conf.get(HConstants.CLUSTER_ID); return this.clusterId;
} }
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@ -759,11 +764,10 @@ public class HRegionServer implements ClientProtocol,
// Since cluster status is now up // Since cluster status is now up
// ID should have already been set by HMaster // ID should have already been set by HMaster
try { try {
String clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
if (clusterId == null) { if (clusterId == null) {
this.abort("Cluster ID has not been set"); this.abort("Cluster ID has not been set");
} }
this.conf.set(HConstants.CLUSTER_ID, clusterId);
LOG.info("ClusterId : "+clusterId); LOG.info("ClusterId : "+clusterId);
} catch (KeeperException e) { } catch (KeeperException e) {
this.abort("Failed to retrieve Cluster ID",e); this.abort("Failed to retrieve Cluster ID",e);
@ -833,7 +837,7 @@ public class HRegionServer implements ClientProtocol,
movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this); movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
// Setup RPC client for master communication // Setup RPC client for master communication
rpcClientEngine = new ProtobufRpcClientEngine(conf); rpcClientEngine = new ProtobufRpcClientEngine(conf, clusterId);
} }
/** /**

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -48,7 +49,7 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine {
private static AtomicInteger invokations = new AtomicInteger(); private static AtomicInteger invokations = new AtomicInteger();
public RandomTimeoutRpcEngine(Configuration conf) { public RandomTimeoutRpcEngine(Configuration conf) {
super(conf); super(conf, HConstants.CLUSTER_ID_DEFAULT);
} }
@Override @Override

View File

@ -30,6 +30,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
@ -73,7 +74,8 @@ public class TestDelayedRpc {
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start(); rpcServer.start();
ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf); ProtobufRpcClientEngine clientEngine =
new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
TestRpc client = clientEngine.getProxy(TestRpc.class, TestRpc client = clientEngine.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000); rpcServer.getListenerAddress(), conf, 1000);
@ -142,7 +144,8 @@ public class TestDelayedRpc {
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start(); rpcServer.start();
ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf); ProtobufRpcClientEngine clientEngine =
new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
TestRpc client = clientEngine.getProxy(TestRpc.class, TestRpc client = clientEngine.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000); rpcServer.getListenerAddress(), conf, 1000);
@ -261,7 +264,8 @@ public class TestDelayedRpc {
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start(); rpcServer.start();
ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf); ProtobufRpcClientEngine clientEngine =
new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
TestRpc client = clientEngine.getProxy(TestRpc.class, TestRpc client = clientEngine.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000); rpcServer.getListenerAddress(), conf, 1000);

View File

@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@ -83,8 +84,7 @@ public class TestIPC {
rpcServer.start(); rpcServer.start();
HBaseClient client = new HBaseClient( HBaseClient client = new HBaseClient(
conf, conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
spyFactory);
InetSocketAddress address = rpcServer.getListenerAddress(); InetSocketAddress address = rpcServer.getListenerAddress();
try { try {

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
@ -101,7 +102,8 @@ public class TestProtoBufRpc {
@Test @Test
public void testProtoBufRpc() throws Exception { public void testProtoBufRpc() throws Exception {
ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf); ProtobufRpcClientEngine clientEngine =
new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
TestRpcService client = clientEngine.getProxy(TestRpcService.class, addr, conf, 10000); TestRpcService client = clientEngine.getProxy(TestRpcService.class, addr, conf, 10000);
// Test ping method // Test ping method

View File

@ -51,7 +51,8 @@ public class TestHMasterRPCException {
ServerName sm = hm.getServerName(); ServerName sm = hm.getServerName();
InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort()); InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort());
ProtobufRpcClientEngine engine = new ProtobufRpcClientEngine(conf); ProtobufRpcClientEngine engine =
new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
int i = 0; int i = 0;
//retry the RPC a few times; we have seen SocketTimeoutExceptions if we //retry the RPC a few times; we have seen SocketTimeoutExceptions if we

View File

@ -84,7 +84,7 @@ public class TestClusterId {
String clusterId = ZKClusterId.readClusterIdZNode(TEST_UTIL.getZooKeeperWatcher()); String clusterId = ZKClusterId.readClusterIdZNode(TEST_UTIL.getZooKeeperWatcher());
assertNotNull(clusterId); assertNotNull(clusterId);
assertEquals(clusterId, rst.getRegionServer().getConfiguration().get(HConstants.CLUSTER_ID)); assertEquals(clusterId, rst.getRegionServer().getClusterId());
} }
} }

View File

@ -363,9 +363,8 @@ public class TestTokenAuthentication {
testuser.doAs(new PrivilegedExceptionAction<Object>() { testuser.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception { public Object run() throws Exception {
Configuration c = server.getConfiguration(); Configuration c = server.getConfiguration();
c.set(HConstants.CLUSTER_ID, clusterId.toString());
ProtobufRpcClientEngine rpcClient = ProtobufRpcClientEngine rpcClient =
new ProtobufRpcClientEngine(c); new ProtobufRpcClientEngine(c, clusterId.toString());
try { try {
AuthenticationProtos.AuthenticationService.BlockingInterface proxy = AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
HBaseClientRPC.waitForProxy(rpcClient, BlockingAuthenticationService.class, HBaseClientRPC.waitForProxy(rpcClient, BlockingAuthenticationService.class,