diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 49083b7c53b..51f52393d65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -83,7 +83,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public void init() throws ReplicationException { try { - ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + } } catch (KeeperException e) { throw new ReplicationException("Could not initialize replication peers", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index cc66882c03a..e0b42ae2c6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -395,6 +395,32 @@ public class TableMapReduceUtil { } } + /** + * Obtain an authentication token, for the specified cluster, on behalf of the current user + * and add it to the credentials for the given map reduce job. + * + * The quorumAddress is the key to the ZK ensemble, which contains: + * hbase.zookeeper.quorum, hbase.zookeeper.client.port and zookeeper.znode.parent + * + * @param job The job that requires the permission. + * @param quorumAddress string that contains the 3 required configuratins + * @throws IOException When the authentication token cannot be obtained. + */ + public static void initCredentialsForCluster(Job job, String quorumAddress) + throws IOException { + UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); + if (userProvider.isHBaseSecurityEnabled()) { + try { + Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); + ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); + obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent()); + } catch (InterruptedException e) { + LOG.info("Interrupted obtaining user authentication token"); + Thread.interrupted(); + } + } + } + private static void obtainAuthTokenForJob(Job job, Configuration conf, User user) throws IOException, InterruptedException { Token authToken = getAuthToken(conf, user); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 0c84c6a8048..9694aab66b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; @@ -106,35 +107,13 @@ public class VerifyReplication { HConnectionManager.execute(new HConnectable(conf) { @Override public Void connect(HConnection conn) throws IOException { - ZooKeeperWatcher localZKW = null; - ReplicationPeer peer = null; - try { - localZKW = new ZooKeeperWatcher( - conf, "VerifyReplication", new Abortable() { - @Override public void abort(String why, Throwable e) {} - @Override public boolean isAborted() {return false;} - }); - ReplicationPeers rp = - ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); - rp.init(); - Configuration peerConf = rp.getPeerConf(peerId); - if (peerConf == null) { - throw new IOException("Couldn't get peer conf!"); - } - HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName")); - scan.setStartRow(value.getRow()); - replicatedScanner = replicatedTable.getScanner(scan); - } catch (ReplicationException e) { - throw new IOException( - "An error occured while trying to connect to the remove peer cluster", e); - } finally { - if (peer != null) { - peer.close(); - } - if (localZKW != null) { - localZKW.close(); - } - } + String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); + Configuration peerConf = HBaseConfiguration.create(conf); + ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); + + HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName")); + scan.setStartRow(value.getRow()); + replicatedScanner = replicatedTable.getScanner(scan); return null; } }); @@ -157,6 +136,38 @@ public class VerifyReplication { } } + private static String getPeerQuorumAddress(final Configuration conf) throws IOException { + ZooKeeperWatcher localZKW = null; + ReplicationPeer peer = null; + try { + localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", + new Abortable() { + @Override public void abort(String why, Throwable e) {} + @Override public boolean isAborted() {return false;} + }); + + ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); + rp.init(); + + Configuration peerConf = rp.getPeerConf(peerId); + if (peerConf == null) { + throw new IOException("Couldn't get peer conf!"); + } + + return ZKUtil.getZooKeeperClusterKey(peerConf); + } catch (ReplicationException e) { + throw new IOException( + "An error occured while trying to connect to the remove peer cluster", e); + } finally { + if (peer != null) { + peer.close(); + } + if (localZKW != null) { + localZKW.close(); + } + } + } + /** * Sets up the actual job. * @@ -181,6 +192,11 @@ public class VerifyReplication { if (families != null) { conf.set(NAME+".families", families); } + + String peerQuorumAddress = getPeerQuorumAddress(conf); + conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); + LOG.info("Peer Quorum Address: " + peerQuorumAddress); + Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(VerifyReplication.class); @@ -194,6 +210,10 @@ public class VerifyReplication { } TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); + + // Obtain the auth token from peer cluster + TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress); + job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); return job;