From d45cbcac1afe8b8440b1d9bf946892edb5765ef2 Mon Sep 17 00:00:00 2001 From: Gary Helmling Date: Wed, 13 Mar 2013 00:38:19 +0000 Subject: [PATCH] HBASE-7482 Port HBASE-7442 HBase remote CopyTable not working when security enabled to trunk git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1455764 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/client/HConnectionManager.java | 48 ++++++++----------- .../apache/hadoop/hbase/ipc/HBaseClient.java | 8 ++-- .../hbase/ipc/ProtobufRpcClientEngine.java | 4 +- .../org/apache/hadoop/hbase/HConstants.java | 4 +- .../hbase/mapreduce/TableMapReduceUtil.java | 7 +++ .../hbase/mapreduce/TableOutputFormat.java | 7 ++- .../hbase/regionserver/HRegionServer.java | 12 +++-- .../hbase/ipc/RandomTimeoutRpcEngine.java | 3 +- .../hadoop/hbase/ipc/TestDelayedRpc.java | 10 ++-- .../org/apache/hadoop/hbase/ipc/TestIPC.java | 4 +- .../hadoop/hbase/ipc/TestProtoBufRpc.java | 4 +- .../hbase/master/TestHMasterRPCException.java | 3 +- .../hbase/regionserver/TestClusterId.java | 2 +- .../token/TestTokenAuthentication.java | 3 +- 14 files changed, 67 insertions(+), 52 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 8ad54fc1826..17d85ce6cbb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -615,7 +615,7 @@ public class HConnectionManager { // ProtobufRpcClientEngine is the main RpcClientEngine implementation, // but we maintain access through an interface to allow overriding for tests // RPC engine setup must follow obtaining the cluster ID for token authentication to work - this.rpcEngine = new ProtobufRpcClientEngine(this.conf); + this.rpcEngine = new ProtobufRpcClientEngine(this.conf, this.clusterId); } /** @@ -627,41 +627,35 @@ public class HConnectionManager { } private String clusterId = null; + public final void retrieveClusterId(){ - if (conf.get(HConstants.CLUSTER_ID) != null){ + if (clusterId != null) { return; } // No synchronized here, worse case we will retrieve it twice, that's // not an issue. - if (this.clusterId == null){ - this.clusterId = conf.get(HConstants.CLUSTER_ID); - if (this.clusterId == null) { - ZooKeeperKeepAliveConnection zkw = null; - try { - zkw = getKeepAliveZooKeeperWatcher(); - this.clusterId = ZKClusterId.readClusterIdZNode(zkw); - if (clusterId == null) { - LOG.info("ClusterId read in ZooKeeper is null"); - } - } catch (KeeperException e) { - LOG.warn("Can't retrieve clusterId from Zookeeper", e); - } catch (IOException e) { - LOG.warn("Can't retrieve clusterId from Zookeeper", e); - } finally { - if (zkw != null) { - zkw.close(); - } - } - if (this.clusterId == null) { - this.clusterId = "default"; - } - - LOG.info("ClusterId is " + clusterId); + ZooKeeperKeepAliveConnection zkw = null; + try { + zkw = getKeepAliveZooKeeperWatcher(); + clusterId = ZKClusterId.readClusterIdZNode(zkw); + if (clusterId == null) { + LOG.info("ClusterId read in ZooKeeper is null"); + } + } catch (KeeperException e) { + LOG.warn("Can't retrieve clusterId from Zookeeper", e); + } catch (IOException e) { + LOG.warn("Can't retrieve clusterId from Zookeeper", e); + } finally { + if (zkw != null) { + zkw.close(); } } + if (clusterId == null) { + clusterId = HConstants.CLUSTER_ID_DEFAULT; + } - conf.set(HConstants.CLUSTER_ID, clusterId); + LOG.info("ClusterId is " + clusterId); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 737346b0252..717259d902c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -1156,7 +1156,7 @@ public class HBaseClient { * @param conf configuration * @param factory socket factory */ - public HBaseClient(Configuration conf, SocketFactory factory) { + public HBaseClient(Configuration conf, String clusterId, SocketFactory factory) { this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); @@ -1169,7 +1169,7 @@ public class HBaseClient { } this.conf = conf; this.socketFactory = factory; - this.clusterId = conf.get(HConstants.CLUSTER_ID, "default"); + this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; this.connections = new PoolMap( getPoolType(conf), getPoolSize(conf)); this.failedServers = new FailedServers(conf); @@ -1179,8 +1179,8 @@ public class HBaseClient { * Construct an IPC client with the default SocketFactory * @param conf configuration */ - public HBaseClient(Configuration conf) { - this(conf, NetUtils.getDefaultSocketFactory(conf)); + public HBaseClient(Configuration conf, String clusterId) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf)); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java index 3e3d8023a02..0e16448e05d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java @@ -44,8 +44,8 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { protected HBaseClient client; - public ProtobufRpcClientEngine(Configuration conf) { - this.client = new HBaseClient(conf); + public ProtobufRpcClientEngine(Configuration conf, String clusterId) { + this.client = new HBaseClient(conf, clusterId); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 44d968aeeb7..5fef3592d3f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -327,8 +327,8 @@ public final class HConstants { /** name of the file for unique cluster ID */ public static final String CLUSTER_ID_FILE_NAME = "hbase.id"; - /** Configuration key storing the cluster ID */ - public static final String CLUSTER_ID = "hbase.cluster.id"; + /** Default value for cluster ID */ + public static final String CLUSTER_ID_DEFAULT = "default-cluster"; // Always store the location of the root table's HRegion. // This HRegion is never split. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 8a2943e9b5c..21dbdc8e767 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -290,6 +290,13 @@ public class TableMapReduceUtil { public static void initCredentials(Job job) throws IOException { if (User.isHBaseSecurityEnabled(job.getConfiguration())) { try { + // init credentials for remote cluster + String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); + if (quorumAddress != null) { + Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); + ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); + User.getCurrent().obtainAuthTokenForJob(peerConf, job); + } User.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job); } catch (InterruptedException ie) { LOG.info("Interrupted obtaining user authentication token"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index dd797105df4..48cf4c7501b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -182,14 +182,17 @@ implements Configurable { @Override public void setConf(Configuration otherConf) { this.conf = HBaseConfiguration.create(otherConf); + String tableName = this.conf.get(OUTPUT_TABLE); if(tableName == null || tableName.length() <= 0) { throw new IllegalArgumentException("Must specify table name"); } + String address = this.conf.get(QUORUM_ADDRESS); - int zkClientPort = conf.getInt(QUORUM_PORT, 0); + int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); String serverClass = this.conf.get(REGION_SERVER_CLASS); String serverImpl = this.conf.get(REGION_SERVER_IMPL); + try { if (address != null) { ZKUtil.applyClusterKeyToConf(this.conf, address); @@ -198,7 +201,7 @@ implements Configurable { this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); } if (zkClientPort != 0) { - conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); + this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } this.table = new HTable(this.conf, tableName); this.table.setAutoFlush(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index eda519a554e..c3078cd5692 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -405,6 +405,11 @@ public class HRegionServer implements ClientProtocol, */ private final long startcode; + /** + * Unique identifier for the cluster we are a part of. + */ + private String clusterId; + /** * MX Bean for RegionServerInfo */ @@ -540,7 +545,7 @@ public class HRegionServer implements ClientProtocol, } String getClusterId() { - return this.conf.get(HConstants.CLUSTER_ID); + return this.clusterId; } @Retention(RetentionPolicy.RUNTIME) @@ -759,11 +764,10 @@ public class HRegionServer implements ClientProtocol, // Since cluster status is now up // ID should have already been set by HMaster try { - String clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); + clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); if (clusterId == null) { this.abort("Cluster ID has not been set"); } - this.conf.set(HConstants.CLUSTER_ID, clusterId); LOG.info("ClusterId : "+clusterId); } catch (KeeperException e) { this.abort("Failed to retrieve Cluster ID",e); @@ -833,7 +837,7 @@ public class HRegionServer implements ClientProtocol, movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this); // Setup RPC client for master communication - rpcClientEngine = new ProtobufRpcClientEngine(conf); + rpcClientEngine = new ProtobufRpcClientEngine(conf, clusterId); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java index c52aa50e8db..3cf9e4fc6df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.security.User; @@ -48,7 +49,7 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine { private static AtomicInteger invokations = new AtomicInteger(); public RandomTimeoutRpcEngine(Configuration conf) { - super(conf); + super(conf, HConstants.CLUSTER_ID_DEFAULT); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index 25f7d8c4a1f..38033536160 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -30,6 +30,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg; @@ -73,7 +74,8 @@ public class TestDelayedRpc { isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf); + ProtobufRpcClientEngine clientEngine = + new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT); try { TestRpc client = clientEngine.getProxy(TestRpc.class, rpcServer.getListenerAddress(), conf, 1000); @@ -142,7 +144,8 @@ public class TestDelayedRpc { isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf); + ProtobufRpcClientEngine clientEngine = + new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT); try { TestRpc client = clientEngine.getProxy(TestRpc.class, rpcServer.getListenerAddress(), conf, 1000); @@ -261,7 +264,8 @@ public class TestDelayedRpc { isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf); + ProtobufRpcClientEngine clientEngine = + new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT); try { TestRpc client = clientEngine.getProxy(TestRpc.class, rpcServer.getListenerAddress(), conf, 1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index f54350d4e01..a120cd51d0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; @@ -83,8 +84,7 @@ public class TestIPC { rpcServer.start(); HBaseClient client = new HBaseClient( - conf, - spyFactory); + conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); InetSocketAddress address = rpcServer.getListenerAddress(); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index c2866e53ffd..cdde6e14807 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; @@ -101,7 +102,8 @@ public class TestProtoBufRpc { @Test public void testProtoBufRpc() throws Exception { - ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf); + ProtobufRpcClientEngine clientEngine = + new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT); try { TestRpcService client = clientEngine.getProxy(TestRpcService.class, addr, conf, 10000); // Test ping method diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java index 41bda109f05..f347d53e288 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java @@ -51,7 +51,8 @@ public class TestHMasterRPCException { ServerName sm = hm.getServerName(); InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort()); - ProtobufRpcClientEngine engine = new ProtobufRpcClientEngine(conf); + ProtobufRpcClientEngine engine = + new ProtobufRpcClientEngine(conf, HConstants.CLUSTER_ID_DEFAULT); try { int i = 0; //retry the RPC a few times; we have seen SocketTimeoutExceptions if we diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java index 3749f74431e..0347d241751 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java @@ -84,7 +84,7 @@ public class TestClusterId { String clusterId = ZKClusterId.readClusterIdZNode(TEST_UTIL.getZooKeeperWatcher()); assertNotNull(clusterId); - assertEquals(clusterId, rst.getRegionServer().getConfiguration().get(HConstants.CLUSTER_ID)); + assertEquals(clusterId, rst.getRegionServer().getClusterId()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index dd18ae72e8a..2ccbe39a0fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -363,9 +363,8 @@ public class TestTokenAuthentication { testuser.doAs(new PrivilegedExceptionAction() { public Object run() throws Exception { Configuration c = server.getConfiguration(); - c.set(HConstants.CLUSTER_ID, clusterId.toString()); ProtobufRpcClientEngine rpcClient = - new ProtobufRpcClientEngine(c); + new ProtobufRpcClientEngine(c, clusterId.toString()); try { AuthenticationProtos.AuthenticationService.BlockingInterface proxy = HBaseClientRPC.waitForProxy(rpcClient, BlockingAuthenticationService.class,