From 3c1c948f45fefb2f6dcb14efdf9cc76ddac85d6d Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Wed, 27 Oct 2010 23:17:30 +0000 Subject: [PATCH] HBASE-3012 TOF doesn't take zk client port for remote clusters git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1028132 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/mapreduce/CopyTable.java | 4 +-- .../hbase/mapreduce/TableMapReduceUtil.java | 17 +++++---- .../hbase/mapreduce/TableOutputFormat.java | 6 ++-- .../replication/ReplicationZookeeper.java | 20 +++++------ .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 35 +++++++++++++++++++ .../apache/hadoop/hbase/TestZooKeeper.java | 28 +++++++++++++++ 7 files changed, 84 insertions(+), 27 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 67a3a27f960..5a3fb778ada 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -619,6 +619,7 @@ Release 0.21.0 - Unreleased (Nicolas Spiegelberg via Stack) HBASE-3155 HFile.appendMetaBlock() uses wrong comparator (Nicolas Spiegelberg via Stack) + HBASE-3012 TOF doesn't take zk client port for remote clusters IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 5e7ce1b7f53..339651f4086 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -100,7 +100,7 @@ public class CopyTable { System.err.println(" endtime end of the time range"); System.err.println(" new.name new table's name"); System.err.println(" peer.adr Address of the peer cluster given in the format"); - System.err.println(" hbase.zookeeer.quorum:zookeeper.znode.parent"); + System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent"); System.err.println(" families comma-seperated list of families to copy"); System.err.println(); System.err.println("Args:"); @@ -111,7 +111,7 @@ public class CopyTable { System.err.println(" $ bin/hbase " + "org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " + "--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " + - "--peer.adr=server1,server2,server3:/hbase TestTable "); + "--peer.adr=server1,server2,server3:2181:/hbase TestTable "); } private static boolean doCommandLine(final String[] args) { diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 4ca8053e044..daa90f36b19 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; @@ -185,7 +186,8 @@ public class TableMapReduceUtil { * default; e.g. copying tables between clusters, the source would be * designated by hbase-site.xml and this param would have the * ensemble address of the remote cluster. The format to pass is particular. - * Pass <hbase.zookeeper.quorum> ':' <ZOOKEEPER_ZNODE_PARENT>. + * Pass <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent> + * such as server,server2,server3:2181:/hbase. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.impl * @throws IOException When determining the region count fails. @@ -215,7 +217,8 @@ public class TableMapReduceUtil { * default; e.g. copying tables between clusters, the source would be * designated by hbase-site.xml and this param would have the * ensemble address of the remote cluster. The format to pass is particular. - * Pass <hbase.zookeeper.quorum> ':' <ZOOKEEPER_ZNODE_PARENT>. + * Pass <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent> + * such as server,server2,server3:2181:/hbase. * @param serverClass redefined hbase.regionserver.class * @param serverImpl redefined hbase.regionserver.impl * @param addDependencyJars upload HBase jars and jars for any of the configured @@ -233,13 +236,9 @@ public class TableMapReduceUtil { conf.set(TableOutputFormat.OUTPUT_TABLE, table); // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { - if (quorumAddress.split(":").length == 2) { - conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); - } else { - // Not in expected format. - throw new IOException("Please specify the peer cluster using the format of " + - HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_ZNODE_PARENT); - } + // Calling this will validate the format + ZKUtil.transformClusterKey(quorumAddress); + conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); } if (serverClass != null && serverImpl != null) { conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index d6dbc395df2..d0f0e8d1b7a 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -186,10 +187,7 @@ implements Configurable { String serverImpl = conf.get(REGION_SERVER_IMPL); try { if (address != null) { - // Check is done in TMRU - String[] parts = address.split(":"); - conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]); + ZKUtil.applyClusterKeyToConf(conf, address); } if (serverClass != null) { conf.set(HConstants.REGION_SERVER_CLASS, serverClass); diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index a62d43e4f4c..15a200452b9 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -157,9 +157,7 @@ public class ReplicationZookeeper { conf.get("zookeeper.znode.replication.clusterId", "clusterId"); String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); - this.ourClusterKey = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + - this.conf.get("hbase.zookeeper.property.clientPort") + ":" + - this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT); + this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); @@ -275,17 +273,15 @@ public class ReplicationZookeeper { LOG.debug("Not connecting to " + peerId + " because it's us"); return null; } - String[] ensemble = otherClusterKey.split(":"); - if (ensemble.length != 3) { - LOG.warn("Wrong format of cluster address: " + - Bytes.toStringBinary(data)); - return null; - } // Construct the connection to the new peer Configuration otherConf = new Configuration(this.conf); - otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]); - otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]); - otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]); + try { + ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey); + } catch (IOException e) { + LOG.error("Can't get peer because:", e); + return null; + } + ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf, "connection to cluster: " + peerId, this.abortable); return new ReplicationPeer(otherConf, peerId, diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 21a4256271a..565d9d3f574 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -169,6 +169,8 @@ public class ZKUtil { "[\\t\\n\\x0B\\f\\r]", "")); StringBuilder builder = new StringBuilder(ensemble); builder.append(":"); + builder.append(conf.get("hbase.zookeeper.property.clientPort")); + builder.append(":"); builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); if (name != null && !name.isEmpty()) { builder.append(","); @@ -177,6 +179,39 @@ public class ZKUtil { return builder.toString(); } + /** + * Apply the settings in the given key to the given configuration, this is + * used to communicate with distant clusters + * @param conf configuration object to configure + * @param key string that contains the 3 required configuratins + * @throws IOException + */ + public static void applyClusterKeyToConf(Configuration conf, String key) + throws IOException{ + String[] parts = transformClusterKey(key); + conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); + conf.set("hbase.zookeeper.property.clientPort", parts[1]); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]); + } + + /** + * Separate the given key into the three configurations it should contain: + * hbase.zookeeper.quorum, hbase.zookeeper.client.port + * and zookeeper.znode.parent + * @param key + * @return the three configuration in the described order + * @throws IOException + */ + public static String[] transformClusterKey(String key) throws IOException { + String[] parts = key.split(":"); + if (parts.length != 3) { + throw new IOException("Cluster key invalid, the format should be:" + + HConstants.ZOOKEEPER_QUORUM + ":hbase.zookeeper.client.port:" + + HConstants.ZOOKEEPER_ZNODE_PARENT); + } + return parts; + } + // // Existence checks and watches // diff --git a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index 7874907c858..b9dd937bc6d 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; @@ -195,4 +196,31 @@ public class TestZooKeeper { ZKUtil.deleteNode(zkw, "/l1"); assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null)); } + + @Test + public void testClusterKey() throws Exception { + testKey("server", "2181", "hbase"); + testKey("server1,server2,server3", "2181", "hbase"); + try { + ZKUtil.transformClusterKey("2181:hbase"); + } catch (IOException ex) { + // OK + } + } + + private void testKey(String ensemble, String port, String znode) + throws IOException { + Configuration conf = new Configuration(); + String key = ensemble+":"+port+":"+znode; + String[] parts = ZKUtil.transformClusterKey(key); + assertEquals(ensemble, parts[0]); + assertEquals(port, parts[1]); + assertEquals(znode, parts[2]); + ZKUtil.applyClusterKeyToConf(conf, key); + assertEquals(parts[0], conf.get(HConstants.ZOOKEEPER_QUORUM)); + assertEquals(parts[1], conf.get("hbase.zookeeper.property.clientPort")); + assertEquals(parts[2], conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf); + assertEquals(key, reconstructedKey); + } } \ No newline at end of file