diff --git a/CHANGES.txt b/CHANGES.txt
index 67a3a27f960..5a3fb778ada 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -619,6 +619,7 @@ Release 0.21.0 - Unreleased
(Nicolas Spiegelberg via Stack)
HBASE-3155 HFile.appendMetaBlock() uses wrong comparator
(Nicolas Spiegelberg via Stack)
+ HBASE-3012 TOF doesn't take zk client port for remote clusters
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable
diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index 5e7ce1b7f53..339651f4086 100644
--- a/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -100,7 +100,7 @@ public class CopyTable {
System.err.println(" endtime end of the time range");
System.err.println(" new.name new table's name");
System.err.println(" peer.adr Address of the peer cluster given in the format");
- System.err.println(" hbase.zookeeer.quorum:zookeeper.znode.parent");
+ System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
System.err.println(" families comma-seperated list of families to copy");
System.err.println();
System.err.println("Args:");
@@ -111,7 +111,7 @@ public class CopyTable {
System.err.println(" $ bin/hbase " +
"org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " +
"--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " +
- "--peer.adr=server1,server2,server3:/hbase TestTable ");
+ "--peer.adr=server1,server2,server3:2181:/hbase TestTable ");
}
private static boolean doCommandLine(final String[] args) {
diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 4ca8053e044..daa90f36b19 100644
--- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
@@ -185,7 +186,8 @@ public class TableMapReduceUtil {
* default; e.g. copying tables between clusters, the source would be
* designated by hbase-site.xml
and this param would have the
* ensemble address of the remote cluster. The format to pass is particular.
- * Pass <hbase.zookeeper.quorum> ':' <ZOOKEEPER_ZNODE_PARENT>
.
+ * Pass <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
+ *
such as server,server2,server3:2181:/hbase
.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.impl
* @throws IOException When determining the region count fails.
@@ -215,7 +217,8 @@ public class TableMapReduceUtil {
* default; e.g. copying tables between clusters, the source would be
* designated by hbase-site.xml
and this param would have the
* ensemble address of the remote cluster. The format to pass is particular.
- * Pass <hbase.zookeeper.quorum> ':' <ZOOKEEPER_ZNODE_PARENT>
.
+ * Pass <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
+ *
such as server,server2,server3:2181:/hbase
.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.impl
* @param addDependencyJars upload HBase jars and jars for any of the configured
@@ -233,13 +236,9 @@ public class TableMapReduceUtil {
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
- if (quorumAddress.split(":").length == 2) {
- conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
- } else {
- // Not in expected format.
- throw new IOException("Please specify the peer cluster using the format of " +
- HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_ZNODE_PARENT);
- }
+ // Calling this will validate the format
+ ZKUtil.transformClusterKey(quorumAddress);
+ conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
if (serverClass != null && serverImpl != null) {
conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index d6dbc395df2..d0f0e8d1b7a 100644
--- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -186,10 +187,7 @@ implements Configurable {
String serverImpl = conf.get(REGION_SERVER_IMPL);
try {
if (address != null) {
- // Check is done in TMRU
- String[] parts = address.split(":");
- conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
- conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
+ ZKUtil.applyClusterKeyToConf(conf, address);
}
if (serverClass != null) {
conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
index a62d43e4f4c..15a200452b9 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
@@ -157,9 +157,7 @@ public class ReplicationZookeeper {
conf.get("zookeeper.znode.replication.clusterId", "clusterId");
String rsZNodeName =
conf.get("zookeeper.znode.replication.rs", "rs");
- this.ourClusterKey = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
- this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
- this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+ this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
this.replicationZNode =
ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
@@ -275,17 +273,15 @@ public class ReplicationZookeeper {
LOG.debug("Not connecting to " + peerId + " because it's us");
return null;
}
- String[] ensemble = otherClusterKey.split(":");
- if (ensemble.length != 3) {
- LOG.warn("Wrong format of cluster address: " +
- Bytes.toStringBinary(data));
- return null;
- }
// Construct the connection to the new peer
Configuration otherConf = new Configuration(this.conf);
- otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
- otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
- otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+ try {
+ ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
+ } catch (IOException e) {
+ LOG.error("Can't get peer because:", e);
+ return null;
+ }
+
ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
"connection to cluster: " + peerId, this.abortable);
return new ReplicationPeer(otherConf, peerId,
diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 21a4256271a..565d9d3f574 100644
--- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -169,6 +169,8 @@ public class ZKUtil {
"[\\t\\n\\x0B\\f\\r]", ""));
StringBuilder builder = new StringBuilder(ensemble);
builder.append(":");
+ builder.append(conf.get("hbase.zookeeper.property.clientPort"));
+ builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (name != null && !name.isEmpty()) {
builder.append(",");
@@ -177,6 +179,39 @@ public class ZKUtil {
return builder.toString();
}
+ /**
+ * Apply the settings in the given key to the given configuration, this is
+ * used to communicate with distant clusters
+ * @param conf configuration object to configure
+ * @param key string that contains the 3 required configuratins
+ * @throws IOException
+ */
+ public static void applyClusterKeyToConf(Configuration conf, String key)
+ throws IOException{
+ String[] parts = transformClusterKey(key);
+ conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
+ conf.set("hbase.zookeeper.property.clientPort", parts[1]);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
+ }
+
+ /**
+ * Separate the given key into the three configurations it should contain:
+ * hbase.zookeeper.quorum, hbase.zookeeper.client.port
+ * and zookeeper.znode.parent
+ * @param key
+ * @return the three configuration in the described order
+ * @throws IOException
+ */
+ public static String[] transformClusterKey(String key) throws IOException {
+ String[] parts = key.split(":");
+ if (parts.length != 3) {
+ throw new IOException("Cluster key invalid, the format should be:" +
+ HConstants.ZOOKEEPER_QUORUM + ":hbase.zookeeper.client.port:"
+ + HConstants.ZOOKEEPER_ZNODE_PARENT);
+ }
+ return parts;
+ }
+
//
// Existence checks and watches
//
diff --git a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
index 7874907c858..b9dd937bc6d 100644
--- a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
+++ b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@@ -195,4 +196,31 @@ public class TestZooKeeper {
ZKUtil.deleteNode(zkw, "/l1");
assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null));
}
+
+ @Test
+ public void testClusterKey() throws Exception {
+ testKey("server", "2181", "hbase");
+ testKey("server1,server2,server3", "2181", "hbase");
+ try {
+ ZKUtil.transformClusterKey("2181:hbase");
+ } catch (IOException ex) {
+ // OK
+ }
+ }
+
+ private void testKey(String ensemble, String port, String znode)
+ throws IOException {
+ Configuration conf = new Configuration();
+ String key = ensemble+":"+port+":"+znode;
+ String[] parts = ZKUtil.transformClusterKey(key);
+ assertEquals(ensemble, parts[0]);
+ assertEquals(port, parts[1]);
+ assertEquals(znode, parts[2]);
+ ZKUtil.applyClusterKeyToConf(conf, key);
+ assertEquals(parts[0], conf.get(HConstants.ZOOKEEPER_QUORUM));
+ assertEquals(parts[1], conf.get("hbase.zookeeper.property.clientPort"));
+ assertEquals(parts[2], conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf);
+ assertEquals(key, reconstructedKey);
+ }
}
\ No newline at end of file