HBASE-15769 Perform validation on cluster key for add_peer (Matt Warhaftig)
This commit is contained in:
parent
c12eaa5cc4
commit
838c306c0a
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -116,6 +117,14 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
|||
throw new IllegalArgumentException("Found invalid peer name:" + id);
|
||||
}
|
||||
|
||||
if (peerConfig.getClusterKey() != null) {
|
||||
try {
|
||||
ZKConfig.validateClusterKey(peerConfig.getClusterKey());
|
||||
} catch (IOException ioe) {
|
||||
throw new IllegalArgumentException(ioe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
checkQueuesDeleted(id);
|
||||
|
||||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||
|
|
|
@ -190,12 +190,23 @@ public final class ZKConfig {
|
|||
String[] parts = key.split(":");
|
||||
|
||||
if (parts.length == 3) {
|
||||
if (!parts[2].matches("/.*[^/]")) {
|
||||
throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
|
||||
HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
|
||||
+ HConstants.ZOOKEEPER_ZNODE_PARENT);
|
||||
}
|
||||
return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
|
||||
}
|
||||
|
||||
if (parts.length > 3) {
|
||||
// The quorum could contain client port in server:clientport format, try to transform more.
|
||||
String zNodeParent = parts [parts.length - 1];
|
||||
if (!zNodeParent.matches("/.*[^/]")) {
|
||||
throw new IOException("Cluster key passed " + key + " is invalid, the format should be:"
|
||||
+ HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
|
||||
+ HConstants.ZOOKEEPER_ZNODE_PARENT);
|
||||
}
|
||||
|
||||
String clientPort = parts [parts.length - 2];
|
||||
|
||||
// The first part length is the total length minus the lengths of other parts and minus 2 ":"
|
||||
|
|
|
@ -59,10 +59,10 @@ public class TestZKConfig {
|
|||
|
||||
@Test
|
||||
public void testClusterKey() throws Exception {
|
||||
testKey("server", 2181, "hbase");
|
||||
testKey("server1,server2,server3", 2181, "hbase");
|
||||
testKey("server", 2181, "/hbase");
|
||||
testKey("server1,server2,server3", 2181, "/hbase");
|
||||
try {
|
||||
ZKConfig.validateClusterKey("2181:hbase");
|
||||
ZKConfig.validateClusterKey("2181:/hbase");
|
||||
} catch (IOException ex) {
|
||||
// OK
|
||||
}
|
||||
|
@ -71,19 +71,19 @@ public class TestZKConfig {
|
|||
@Test
|
||||
public void testClusterKeyWithMultiplePorts() throws Exception {
|
||||
// server has different port than the default port
|
||||
testKey("server1:2182", 2181, "hbase", true);
|
||||
testKey("server1:2182", 2181, "/hbase", true);
|
||||
// multiple servers have their own port
|
||||
testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
|
||||
testKey("server1:2182,server2:2183,server3:2184", 2181, "/hbase", true);
|
||||
// one server has no specified port, should use default port
|
||||
testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
|
||||
testKey("server1:2182,server2,server3:2184", 2181, "/hbase", true);
|
||||
// the last server has no specified port, should use default port
|
||||
testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
|
||||
testKey("server1:2182,server2:2183,server3", 2181, "/hbase", true);
|
||||
// multiple servers have no specified port, should use default port for those servers
|
||||
testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
|
||||
testKey("server1:2182,server2,server3:2184,server4", 2181, "/hbase", true);
|
||||
// same server, different ports
|
||||
testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
|
||||
testKey("server1:2182,server1:2183,server1", 2181, "/hbase", true);
|
||||
// mix of same server/different port and different server
|
||||
testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
|
||||
testKey("server1:2182,server2:2183,server1", 2181, "/hbase", true);
|
||||
}
|
||||
|
||||
private void testKey(String ensemble, int port, String znode)
|
||||
|
|
|
@ -160,6 +160,37 @@ public abstract class TestReplicationStateBasic {
|
|||
assertEquals(0, rq2.getListOfReplicators().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
|
||||
rp.init();
|
||||
|
||||
try {
|
||||
rp.addPeer(ID_ONE,
|
||||
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
|
||||
fail("Should throw an IllegalArgumentException because "
|
||||
+ "zookeeper.znode.parent is missing leading '/'.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Expected.
|
||||
}
|
||||
|
||||
try {
|
||||
rp.addPeer(ID_ONE,
|
||||
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
|
||||
fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Expected.
|
||||
}
|
||||
|
||||
try {
|
||||
rp.addPeer(ID_ONE,
|
||||
new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
|
||||
fail("Should throw an IllegalArgumentException because "
|
||||
+ "hbase.zookeeper.property.clientPort is missing.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Expected.
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
|
||||
rp.init();
|
||||
|
@ -325,7 +356,7 @@ public abstract class TestReplicationStateBasic {
|
|||
rq3.addLog("qId" + i, "filename" + j);
|
||||
}
|
||||
//Add peers for the corresponding queues so they are not orphans
|
||||
rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i));
|
||||
rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,11 +31,8 @@ This gives a full path for HBase to connect to another HBase cluster. An optiona
|
|||
table column families identifies which column families will be replicated to the peer cluster.
|
||||
Examples:
|
||||
|
||||
hbase> add_peer '1', "server1.cie.com:2181:/hbase"
|
||||
hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
|
||||
hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "table1; table2:cf1; table3:cf1,cf2"
|
||||
hbase> add_peer '4', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
|
||||
hbase> add_peer '5', CLUSTER_KEY => "server1.cie.com:2181:/hbase",
|
||||
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
|
||||
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
|
||||
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
|
||||
|
||||
For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments
|
||||
|
|
Loading…
Reference in New Issue