HBASE-3012 TOF doesn't take zk client port for remote clusters
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1028132 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d95e8f964d
commit
3c1c948f45
|
@ -619,6 +619,7 @@ Release 0.21.0 - Unreleased
|
||||||
(Nicolas Spiegelberg via Stack)
|
(Nicolas Spiegelberg via Stack)
|
||||||
HBASE-3155 HFile.appendMetaBlock() uses wrong comparator
|
HBASE-3155 HFile.appendMetaBlock() uses wrong comparator
|
||||||
(Nicolas Spiegelberg via Stack)
|
(Nicolas Spiegelberg via Stack)
|
||||||
|
HBASE-3012 TOF doesn't take zk client port for remote clusters
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1760 Cleanup TODOs in HTable
|
HBASE-1760 Cleanup TODOs in HTable
|
||||||
|
|
|
@ -100,7 +100,7 @@ public class CopyTable {
|
||||||
System.err.println(" endtime end of the time range");
|
System.err.println(" endtime end of the time range");
|
||||||
System.err.println(" new.name new table's name");
|
System.err.println(" new.name new table's name");
|
||||||
System.err.println(" peer.adr Address of the peer cluster given in the format");
|
System.err.println(" peer.adr Address of the peer cluster given in the format");
|
||||||
System.err.println(" hbase.zookeeer.quorum:zookeeper.znode.parent");
|
System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
|
||||||
System.err.println(" families comma-seperated list of families to copy");
|
System.err.println(" families comma-seperated list of families to copy");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Args:");
|
System.err.println("Args:");
|
||||||
|
@ -111,7 +111,7 @@ public class CopyTable {
|
||||||
System.err.println(" $ bin/hbase " +
|
System.err.println(" $ bin/hbase " +
|
||||||
"org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " +
|
"org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " +
|
||||||
"--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " +
|
"--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " +
|
||||||
"--peer.adr=server1,server2,server3:/hbase TestTable ");
|
"--peer.adr=server1,server2,server3:2181:/hbase TestTable ");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean doCommandLine(final String[] args) {
|
private static boolean doCommandLine(final String[] args) {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.util.Base64;
|
import org.apache.hadoop.hbase.util.Base64;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableComparable;
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
@ -185,7 +186,8 @@ public class TableMapReduceUtil {
|
||||||
* default; e.g. copying tables between clusters, the source would be
|
* default; e.g. copying tables between clusters, the source would be
|
||||||
* designated by <code>hbase-site.xml</code> and this param would have the
|
* designated by <code>hbase-site.xml</code> and this param would have the
|
||||||
* ensemble address of the remote cluster. The format to pass is particular.
|
* ensemble address of the remote cluster. The format to pass is particular.
|
||||||
* Pass <code> <hbase.zookeeper.quorum> ':' <ZOOKEEPER_ZNODE_PARENT></code>.
|
* Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
|
||||||
|
* </code> such as <code>server,server2,server3:2181:/hbase</code>.
|
||||||
* @param serverClass redefined hbase.regionserver.class
|
* @param serverClass redefined hbase.regionserver.class
|
||||||
* @param serverImpl redefined hbase.regionserver.impl
|
* @param serverImpl redefined hbase.regionserver.impl
|
||||||
* @throws IOException When determining the region count fails.
|
* @throws IOException When determining the region count fails.
|
||||||
|
@ -215,7 +217,8 @@ public class TableMapReduceUtil {
|
||||||
* default; e.g. copying tables between clusters, the source would be
|
* default; e.g. copying tables between clusters, the source would be
|
||||||
* designated by <code>hbase-site.xml</code> and this param would have the
|
* designated by <code>hbase-site.xml</code> and this param would have the
|
||||||
* ensemble address of the remote cluster. The format to pass is particular.
|
* ensemble address of the remote cluster. The format to pass is particular.
|
||||||
* Pass <code> <hbase.zookeeper.quorum> ':' <ZOOKEEPER_ZNODE_PARENT></code>.
|
* Pass <code> <hbase.zookeeper.quorum>:<hbase.zookeeper.client.port>:<zookeeper.znode.parent>
|
||||||
|
* </code> such as <code>server,server2,server3:2181:/hbase</code>.
|
||||||
* @param serverClass redefined hbase.regionserver.class
|
* @param serverClass redefined hbase.regionserver.class
|
||||||
* @param serverImpl redefined hbase.regionserver.impl
|
* @param serverImpl redefined hbase.regionserver.impl
|
||||||
* @param addDependencyJars upload HBase jars and jars for any of the configured
|
* @param addDependencyJars upload HBase jars and jars for any of the configured
|
||||||
|
@ -233,13 +236,9 @@ public class TableMapReduceUtil {
|
||||||
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
|
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
|
||||||
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
|
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
|
||||||
if (quorumAddress != null) {
|
if (quorumAddress != null) {
|
||||||
if (quorumAddress.split(":").length == 2) {
|
// Calling this will validate the format
|
||||||
conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress);
|
ZKUtil.transformClusterKey(quorumAddress);
|
||||||
} else {
|
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
|
||||||
// Not in expected format.
|
|
||||||
throw new IOException("Please specify the peer cluster using the format of " +
|
|
||||||
HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_ZNODE_PARENT);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (serverClass != null && serverImpl != null) {
|
if (serverClass != null && serverImpl != null) {
|
||||||
conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
|
conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
|
@ -186,10 +187,7 @@ implements Configurable {
|
||||||
String serverImpl = conf.get(REGION_SERVER_IMPL);
|
String serverImpl = conf.get(REGION_SERVER_IMPL);
|
||||||
try {
|
try {
|
||||||
if (address != null) {
|
if (address != null) {
|
||||||
// Check is done in TMRU
|
ZKUtil.applyClusterKeyToConf(conf, address);
|
||||||
String[] parts = address.split(":");
|
|
||||||
conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
|
|
||||||
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[1]);
|
|
||||||
}
|
}
|
||||||
if (serverClass != null) {
|
if (serverClass != null) {
|
||||||
conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
|
conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
|
||||||
|
|
|
@ -157,9 +157,7 @@ public class ReplicationZookeeper {
|
||||||
conf.get("zookeeper.znode.replication.clusterId", "clusterId");
|
conf.get("zookeeper.znode.replication.clusterId", "clusterId");
|
||||||
String rsZNodeName =
|
String rsZNodeName =
|
||||||
conf.get("zookeeper.znode.replication.rs", "rs");
|
conf.get("zookeeper.znode.replication.rs", "rs");
|
||||||
this.ourClusterKey = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
|
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
|
||||||
this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
|
|
||||||
this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
|
|
||||||
this.replicationZNode =
|
this.replicationZNode =
|
||||||
ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
|
ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
|
||||||
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
|
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
|
||||||
|
@ -275,17 +273,15 @@ public class ReplicationZookeeper {
|
||||||
LOG.debug("Not connecting to " + peerId + " because it's us");
|
LOG.debug("Not connecting to " + peerId + " because it's us");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
String[] ensemble = otherClusterKey.split(":");
|
|
||||||
if (ensemble.length != 3) {
|
|
||||||
LOG.warn("Wrong format of cluster address: " +
|
|
||||||
Bytes.toStringBinary(data));
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
// Construct the connection to the new peer
|
// Construct the connection to the new peer
|
||||||
Configuration otherConf = new Configuration(this.conf);
|
Configuration otherConf = new Configuration(this.conf);
|
||||||
otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
|
try {
|
||||||
otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
|
ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
|
||||||
otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
|
} catch (IOException e) {
|
||||||
|
LOG.error("Can't get peer because:", e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
|
ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
|
||||||
"connection to cluster: " + peerId, this.abortable);
|
"connection to cluster: " + peerId, this.abortable);
|
||||||
return new ReplicationPeer(otherConf, peerId,
|
return new ReplicationPeer(otherConf, peerId,
|
||||||
|
|
|
@ -169,6 +169,8 @@ public class ZKUtil {
|
||||||
"[\\t\\n\\x0B\\f\\r]", ""));
|
"[\\t\\n\\x0B\\f\\r]", ""));
|
||||||
StringBuilder builder = new StringBuilder(ensemble);
|
StringBuilder builder = new StringBuilder(ensemble);
|
||||||
builder.append(":");
|
builder.append(":");
|
||||||
|
builder.append(conf.get("hbase.zookeeper.property.clientPort"));
|
||||||
|
builder.append(":");
|
||||||
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
|
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
|
||||||
if (name != null && !name.isEmpty()) {
|
if (name != null && !name.isEmpty()) {
|
||||||
builder.append(",");
|
builder.append(",");
|
||||||
|
@ -177,6 +179,39 @@ public class ZKUtil {
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply the settings in the given key to the given configuration, this is
|
||||||
|
* used to communicate with distant clusters
|
||||||
|
* @param conf configuration object to configure
|
||||||
|
* @param key string that contains the 3 required configuratins
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void applyClusterKeyToConf(Configuration conf, String key)
|
||||||
|
throws IOException{
|
||||||
|
String[] parts = transformClusterKey(key);
|
||||||
|
conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
|
||||||
|
conf.set("hbase.zookeeper.property.clientPort", parts[1]);
|
||||||
|
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Separate the given key into the three configurations it should contain:
|
||||||
|
* hbase.zookeeper.quorum, hbase.zookeeper.client.port
|
||||||
|
* and zookeeper.znode.parent
|
||||||
|
* @param key
|
||||||
|
* @return the three configuration in the described order
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static String[] transformClusterKey(String key) throws IOException {
|
||||||
|
String[] parts = key.split(":");
|
||||||
|
if (parts.length != 3) {
|
||||||
|
throw new IOException("Cluster key invalid, the format should be:" +
|
||||||
|
HConstants.ZOOKEEPER_QUORUM + ":hbase.zookeeper.client.port:"
|
||||||
|
+ HConstants.ZOOKEEPER_ZNODE_PARENT);
|
||||||
|
}
|
||||||
|
return parts;
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Existence checks and watches
|
// Existence checks and watches
|
||||||
//
|
//
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -195,4 +196,31 @@ public class TestZooKeeper {
|
||||||
ZKUtil.deleteNode(zkw, "/l1");
|
ZKUtil.deleteNode(zkw, "/l1");
|
||||||
assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null));
|
assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClusterKey() throws Exception {
|
||||||
|
testKey("server", "2181", "hbase");
|
||||||
|
testKey("server1,server2,server3", "2181", "hbase");
|
||||||
|
try {
|
||||||
|
ZKUtil.transformClusterKey("2181:hbase");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// OK
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testKey(String ensemble, String port, String znode)
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
String key = ensemble+":"+port+":"+znode;
|
||||||
|
String[] parts = ZKUtil.transformClusterKey(key);
|
||||||
|
assertEquals(ensemble, parts[0]);
|
||||||
|
assertEquals(port, parts[1]);
|
||||||
|
assertEquals(znode, parts[2]);
|
||||||
|
ZKUtil.applyClusterKeyToConf(conf, key);
|
||||||
|
assertEquals(parts[0], conf.get(HConstants.ZOOKEEPER_QUORUM));
|
||||||
|
assertEquals(parts[1], conf.get("hbase.zookeeper.property.clientPort"));
|
||||||
|
assertEquals(parts[2], conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
|
||||||
|
String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf);
|
||||||
|
assertEquals(key, reconstructedKey);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue