HBASE-12706 Support multiple port numbers in ZK quorum string (Stephen Yuan Jiang)

This commit is contained in:
Enis Soztutar 2015-03-06 16:47:44 -08:00
parent 880215b728
commit 5619f20e1a
11 changed files with 501 additions and 96 deletions

View File

@ -33,6 +33,11 @@ import org.apache.hadoop.hbase.HConstants;
/** /**
* Utility methods for reading, and building the ZooKeeper configuration. * Utility methods for reading, and building the ZooKeeper configuration.
*
* The order and priority for reading the config are as follows:
* (1). zoo.cfg if ""hbase.config.read.zookeeper.config" is true
* (2). Property with "hbase.zookeeper.property." prefix from HBase XML
* (3). other zookeeper related properties in HBASE XML
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ZKConfig { public class ZKConfig {
@ -51,6 +56,24 @@ public class ZKConfig {
* @return Properties holding mappings representing ZooKeeper config file. * @return Properties holding mappings representing ZooKeeper config file.
*/ */
public static Properties makeZKProps(Configuration conf) { public static Properties makeZKProps(Configuration conf) {
Properties zkProperties = makeZKPropsFromZooCfg(conf);
if (zkProperties == null) {
// Otherwise, use the configuration options from HBase's XML files.
zkProperties = makeZKPropsFromHbaseConfig(conf);
}
return zkProperties;
}
/**
* Parses the corresponding config options from the zoo.cfg file
* and make a Properties object holding the Zookeeper config.
*
* @param conf Configuration to read from.
* @return Properties holding mappings representing the ZooKeeper config file or null if
* the HBASE_CONFIG_READ_ZOOKEEPER_CONFIG is false or the file does not exist.
*/
private static Properties makeZKPropsFromZooCfg(Configuration conf) {
if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) { if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) {
LOG.warn( LOG.warn(
"Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME + "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME +
@ -80,7 +103,18 @@ public class ZKConfig {
} }
} }
// Otherwise, use the configuration options from HBase's XML files. return null;
}
/**
* Make a Properties object holding ZooKeeper config.
* Parses the corresponding config options from the HBase XML configs
* and generates the appropriate ZooKeeper properties.
*
* @param conf Configuration to read from.
* @return Properties holding mappings representing ZooKeeper config file.
*/
private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
Properties zkProperties = new Properties(); Properties zkProperties = new Properties();
// Directly map all of the hbase.zookeeper.property.KEY properties. // Directly map all of the hbase.zookeeper.property.KEY properties.
@ -112,10 +146,17 @@ public class ZKConfig {
final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
HConstants.LOCALHOST); HConstants.LOCALHOST);
String serverHost;
String address;
String key;
for (int i = 0; i < serverHosts.length; ++i) { for (int i = 0; i < serverHosts.length; ++i) {
String serverHost = serverHosts[i]; if (serverHosts[i].contains(":")) {
String address = serverHost + ":" + peerPort + ":" + leaderPort; serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
String key = "server." + i; } else {
serverHost = serverHosts[i];
}
address = serverHost + ":" + peerPort + ":" + leaderPort;
key = "server." + i;
zkProperties.put(key, address); zkProperties.put(key, address);
} }
@ -177,7 +218,8 @@ public class ZKConfig {
} }
// Special case for 'hbase.cluster.distributed' property being 'true' // Special case for 'hbase.cluster.distributed' property being 'true'
if (key.startsWith("server.")) { if (key.startsWith("server.")) {
boolean mode = conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); boolean mode =
conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) { if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) {
String msg = "The server in zoo.cfg cannot be set to localhost " + String msg = "The server in zoo.cfg cannot be set to localhost " +
"in a fully-distributed setup because it won't be reachable. " + "in a fully-distributed setup because it won't be reachable. " +
@ -198,7 +240,7 @@ public class ZKConfig {
* @param properties * @param properties
* @return Quorum servers String * @return Quorum servers String
*/ */
public static String getZKQuorumServersString(Properties properties) { private static String getZKQuorumServersString(Properties properties) {
String clientPort = null; String clientPort = null;
List<String> servers = new ArrayList<String>(); List<String> servers = new ArrayList<String>();
@ -250,12 +292,59 @@ public class ZKConfig {
return hostPortBuilder.toString(); return hostPortBuilder.toString();
} }
/**
* Return the ZK Quorum servers string given the specified configuration
*
* @param conf
* @return Quorum servers String
*/
private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
String defaultClientPort = Integer.toString(
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
// Build the ZK quorum server string with "server:clientport" list, separated by ','
final String[] serverHosts =
conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
return buildQuorumServerString(serverHosts, defaultClientPort);
}
/**
* Build the ZK quorum server string with "server:clientport" list, separated by ','
*
* @param serverHosts a list of servers for ZK quorum
* @param clientPort the default client port
* @return the string for a list of "server:port" separated by ","
*/
public static String buildQuorumServerString(String[] serverHosts, String clientPort) {
StringBuilder quorumStringBuilder = new StringBuilder();
String serverHost;
for (int i = 0; i < serverHosts.length; ++i) {
if (serverHosts[i].contains(":")) {
serverHost = serverHosts[i]; // just use the port specified from the input
} else {
serverHost = serverHosts[i] + ":" + clientPort;
}
if (i > 0) {
quorumStringBuilder.append(',');
}
quorumStringBuilder.append(serverHost);
}
return quorumStringBuilder.toString();
}
/** /**
* Return the ZK Quorum servers string given the specified configuration. * Return the ZK Quorum servers string given the specified configuration.
* @param conf * @param conf
* @return Quorum servers * @return Quorum servers
*/ */
public static String getZKQuorumServersString(Configuration conf) { public static String getZKQuorumServersString(Configuration conf) {
return getZKQuorumServersString(makeZKProps(conf)); // First try zoo.cfg; if not applicable, then try config XML.
Properties zkProperties = makeZKPropsFromZooCfg(conf);
if (zkProperties != null) {
return getZKQuorumServersString(zkProperties);
}
return getZKQuorumServersStringFromHbaseConfig(conf);
} }
} }

View File

@ -31,7 +31,6 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
@ -74,6 +73,7 @@ import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.ZooKeeperSaslServer; import org.apache.zookeeper.server.ZooKeeperSaslServer;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
/** /**
@ -93,6 +93,25 @@ public class ZKUtil {
public static final char ZNODE_PATH_SEPARATOR = '/'; public static final char ZNODE_PATH_SEPARATOR = '/';
private static int zkDumpConnectionTimeOut; private static int zkDumpConnectionTimeOut;
// The Quorum for the ZK cluster can have one the following format (see examples below):
// (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
// (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
// in this case, the clientPort would be ignored)
// (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
// the clientPort; otherwise, it would use the specified port)
@VisibleForTesting
public static class ZKClusterKey {
public String quorumString;
public int clientPort;
public String znodeParent;
ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
this.quorumString = quorumString;
this.clientPort = clientPort;
this.znodeParent = znodeParent;
}
}
/** /**
* Creates a new connection to ZooKeeper, pulling settings and ensemble config * Creates a new connection to ZooKeeper, pulling settings and ensemble config
* from the specified configuration object using methods from {@link ZKConfig}. * from the specified configuration object using methods from {@link ZKConfig}.
@ -106,8 +125,7 @@ public class ZKUtil {
*/ */
public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
throws IOException { throws IOException {
Properties properties = ZKConfig.makeZKProps(conf); String ensemble = ZKConfig.getZKQuorumServersString(conf);
String ensemble = ZKConfig.getZKQuorumServersString(properties);
return connect(conf, ensemble, watcher); return connect(conf, ensemble, watcher);
} }
@ -385,10 +403,10 @@ public class ZKUtil {
*/ */
public static void applyClusterKeyToConf(Configuration conf, String key) public static void applyClusterKeyToConf(Configuration conf, String key)
throws IOException{ throws IOException{
String[] parts = transformClusterKey(key); ZKClusterKey zkClusterKey = transformClusterKey(key);
conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.quorumString);
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]); conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.clientPort);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]); conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.znodeParent);
} }
/** /**
@ -399,14 +417,53 @@ public class ZKUtil {
* @return the three configuration in the described order * @return the three configuration in the described order
* @throws IOException * @throws IOException
*/ */
public static String[] transformClusterKey(String key) throws IOException { public static ZKClusterKey transformClusterKey(String key) throws IOException {
String[] parts = key.split(":"); String[] parts = key.split(":");
if (parts.length != 3) {
throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" + if (parts.length == 3) {
HConstants.ZOOKEEPER_QUORUM + ":hbase.zookeeper.client.port:" return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
+ HConstants.ZOOKEEPER_ZNODE_PARENT);
} }
return parts;
if (parts.length > 3) {
// The quorum could contain client port in server:clientport format, try to transform more.
String zNodeParent = parts [parts.length - 1];
String clientPort = parts [parts.length - 2];
// The first part length is the total length minus the lengths of other parts and minus 2 ":"
int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
String quorumStringInput = key.substring(0, endQuorumIndex);
String[] serverHosts = quorumStringInput.split(",");
// The common case is that every server has its own client port specified - this means
// that (total parts - the ZNodeParent part - the ClientPort part) is equal to
// (the number of "," + 1) - "+ 1" because the last server has no ",".
if ((parts.length - 2) == (serverHosts.length + 1)) {
return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
}
// For the uncommon case that some servers has no port specified, we need to build the
// server:clientport list using default client port for servers without specified port.
return new ZKClusterKey(
ZKConfig.buildQuorumServerString(serverHosts, clientPort),
Integer.parseInt(clientPort),
zNodeParent);
}
throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
+ HConstants.ZOOKEEPER_ZNODE_PARENT);
}
/**
* Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
* @param quorumStringInput a string contains a list of servers for ZK quorum
* @param clientPort the default client port
* @return the string for a list of "server:port" separated by ","
*/
@VisibleForTesting
public static String standardizeQuorumServerString(String quorumStringInput, String clientPort) {
String[] serverHosts = quorumStringInput.split(",");
return ZKConfig.buildQuorumServerString(serverHosts, clientPort);
} }
// //
@ -941,7 +998,8 @@ public class ZKUtil {
// Detection for embedded HBase client with jaas configuration // Detection for embedded HBase client with jaas configuration
// defined for third party programs. // defined for third party programs.
try { try {
javax.security.auth.login.Configuration testConfig = javax.security.auth.login.Configuration.getConfiguration(); javax.security.auth.login.Configuration testConfig =
javax.security.auth.login.Configuration.getConfiguration();
if(testConfig.getAppConfigurationEntry("Client") == null) { if(testConfig.getAppConfigurationEntry("Client") == null) {
return false; return false;
} }
@ -1210,7 +1268,6 @@ public class ZKUtil {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
zkw.interruptedException(ie); zkw.interruptedException(ie);
} }
} catch(InterruptedException ie) { } catch(InterruptedException ie) {
zkw.interruptedException(ie); zkw.interruptedException(ie);
} }
@ -1337,8 +1394,9 @@ public class ZKUtil {
deleteNodeRecursively(zkw, joinZNode(node, child)); deleteNodeRecursively(zkw, joinZNode(node, child));
} }
} }
//Zookeeper Watches are one time triggers; When children of parent nodes are deleted recursively.
//Must set another watch, get notified of delete node //Zookeeper Watches are one time triggers; When children of parent nodes are deleted
//recursively, must set another watch, get notified of delete node
if (zkw.getRecoverableZooKeeper().exists(node, zkw) != null){ if (zkw.getRecoverableZooKeeper().exists(node, zkw) != null){
zkw.getRecoverableZooKeeper().delete(node, -1); zkw.getRecoverableZooKeeper().delete(node, -1);
} }
@ -2026,7 +2084,8 @@ public class ZKUtil {
* @see #logZKTree(ZooKeeperWatcher, String) * @see #logZKTree(ZooKeeperWatcher, String)
* @throws KeeperException if an unexpected exception occurs * @throws KeeperException if an unexpected exception occurs
*/ */
protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix) throws KeeperException { protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix)
throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, root); List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);
if (children == null) return; if (children == null) return;
for (String child : children) { for (String child : children) {

View File

@ -156,10 +156,39 @@ public class HMasterCommandLine extends ServerCommandLine {
DefaultMetricsSystem.setMiniClusterMode(true); DefaultMetricsSystem.setMiniClusterMode(true);
final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf); final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf);
File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR)); File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR));
int zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0);
// find out the default client port
int zkClientPort = 0;
// If the zookeeper client port is specified in server quorum, use it.
String zkserver = conf.get(HConstants.ZOOKEEPER_QUORUM);
if (zkserver != null) {
String[] zkservers = zkserver.split(",");
if (zkservers.length > 1) {
// In local mode deployment, we have the master + a region server and zookeeper server
// started in the same process. Therefore, we only support one zookeeper server.
String errorMsg = "Could not start ZK with " + zkservers.length +
" ZK servers in local mode deployment. Aborting as clients (e.g. shell) will not "
+ "be able to find this ZK quorum.";
System.err.println(errorMsg);
throw new IOException(errorMsg);
}
String[] parts = zkservers[0].split(":");
if (parts.length == 2) {
// the second part is the client port
zkClientPort = Integer.parseInt(parts [1]);
}
}
// If the client port could not be find in server quorum conf, try another conf
if (zkClientPort == 0) { if (zkClientPort == 0) {
throw new IOException("No config value for " zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0);
+ HConstants.ZOOKEEPER_CLIENT_PORT); // The client port has to be set by now; if not, throw exception.
if (zkClientPort == 0) {
throw new IOException("No config value for " + HConstants.ZOOKEEPER_CLIENT_PORT);
}
} }
zooKeeperCluster.setDefaultClientPort(zkClientPort); zooKeeperCluster.setDefaultClientPort(zkClientPort);
@ -180,6 +209,7 @@ public class HMasterCommandLine extends ServerCommandLine {
throw new IOException(errorMsg); throw new IOException(errorMsg);
} }
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort)); conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort));
// Need to have the zk cluster shutdown when master is shutdown. // Need to have the zk cluster shutdown when master is shutdown.
// Run a subclass that does the zk cluster shutdown on its way out. // Run a subclass that does the zk cluster shutdown on its way out.
int mastersCount = conf.getInt("hbase.masters", 1); int mastersCount = conf.getInt("hbase.masters", 1);
@ -254,7 +284,8 @@ public class HMasterCommandLine extends ServerCommandLine {
} }
} }
private static void closeAllRegionServerThreads(List<JVMClusterUtil.RegionServerThread> regionservers) { private static void closeAllRegionServerThreads(
List<JVMClusterUtil.RegionServerThread> regionservers) {
for(JVMClusterUtil.RegionServerThread t : regionservers){ for(JVMClusterUtil.RegionServerThread t : regionservers){
t.getRegionServer().stop("HMaster Aborted; Bringing down regions servers"); t.getRegionServer().stop("HMaster Aborted; Bringing down regions servers");
} }

View File

@ -42,6 +42,8 @@ import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog; import org.apache.zookeeper.server.persistence.FileTxnLog;
import com.google.common.annotations.VisibleForTesting;
/** /**
* TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
* of redoing it, we should contribute updates to their code which let us more * of redoing it, we should contribute updates to their code which let us more
@ -82,6 +84,33 @@ public class MiniZooKeeperCluster {
standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>(); standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
} }
/**
* Add a client port to the list.
*
* @param clientPort the specified port
*/
public void addClientPort(int clientPort) {
clientPortList.add(clientPort);
}
/**
* Get the list of client ports.
* @return clientPortList the client port list
*/
@VisibleForTesting
public List<Integer> getClientPortList() {
return clientPortList;
}
/**
* Check whether the client port in a specific position of the client port list is valid.
*
* @param index the specified position
*/
private boolean hasValidClientPortInList(int index) {
return (clientPortList.size() > index && clientPortList.get(index) > 0);
}
public void setDefaultClientPort(int clientPort) { public void setDefaultClientPort(int clientPort) {
if (clientPort <= 0) { if (clientPort <= 0) {
throw new IllegalArgumentException("Invalid default ZK client port: " throw new IllegalArgumentException("Invalid default ZK client port: "
@ -91,16 +120,39 @@ public class MiniZooKeeperCluster {
} }
/** /**
* Selects a ZK client port. Returns the default port if specified. * Selects a ZK client port.
* Otherwise, returns a random port. The random port is selected from the *
* range between 49152 to 65535. These ports cannot be registered with IANA * @param seedPort the seed port to start with; -1 means first time.
* and are intended for dynamic allocation (see http://bit.ly/dynports). * @Returns a valid and unused client port
*/ */
private int selectClientPort() { private int selectClientPort(int seedPort) {
if (defaultClientPort > 0) { int i;
return defaultClientPort; int returnClientPort = seedPort + 1;
if (returnClientPort == 0) {
// If the new port is invalid, find one - starting with the default client port.
// If the default client port is not specified, starting with a random port.
// The random port is selected from the range between 49152 to 65535. These ports cannot be
// registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
if (defaultClientPort > 0) {
returnClientPort = defaultClientPort;
} else {
returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
}
} }
return 0xc000 + new Random().nextInt(0x3f00); // Make sure that the port is unused.
while (true) {
for (i = 0; i < clientPortList.size(); i++) {
if (returnClientPort == clientPortList.get(i)) {
// Already used. Update the port and retry.
returnClientPort++;
break;
}
}
if (i == clientPortList.size()) {
break; // found a unused port, exit
}
}
return returnClientPort;
} }
public void setTickTime(int tickTime) { public void setTickTime(int tickTime) {
@ -126,7 +178,11 @@ public class MiniZooKeeperCluster {
} }
public int startup(File baseDir) throws IOException, InterruptedException { public int startup(File baseDir) throws IOException, InterruptedException {
return startup(baseDir,1); int numZooKeeperServers = clientPortList.size();
if (numZooKeeperServers == 0) {
numZooKeeperServers = 1; // need at least 1 ZK server for testing
}
return startup(baseDir, numZooKeeperServers);
} }
/** /**
@ -145,7 +201,8 @@ public class MiniZooKeeperCluster {
setupTestEnv(); setupTestEnv();
shutdown(); shutdown();
int tentativePort = selectClientPort(); int tentativePort = -1; // the seed port
int currentClientPort;
// running all the ZK servers // running all the ZK servers
for (int i = 0; i < numZooKeeperServers; i++) { for (int i = 0; i < numZooKeeperServers; i++) {
@ -157,21 +214,33 @@ public class MiniZooKeeperCluster {
} else { } else {
tickTimeToUse = TICK_TIME; tickTimeToUse = TICK_TIME;
} }
// Set up client port - if we have already had a list of valid ports, use it.
if (hasValidClientPortInList(i)) {
currentClientPort = clientPortList.get(i);
} else {
tentativePort = selectClientPort(tentativePort); // update the seed
currentClientPort = tentativePort;
}
ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
NIOServerCnxnFactory standaloneServerFactory; NIOServerCnxnFactory standaloneServerFactory;
while (true) { while (true) {
try { try {
standaloneServerFactory = new NIOServerCnxnFactory(); standaloneServerFactory = new NIOServerCnxnFactory();
standaloneServerFactory.configure( standaloneServerFactory.configure(
new InetSocketAddress(tentativePort), new InetSocketAddress(currentClientPort),
configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000)); configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000));
} catch (BindException e) { } catch (BindException e) {
LOG.debug("Failed binding ZK Server to client port: " + LOG.debug("Failed binding ZK Server to client port: " +
tentativePort, e); currentClientPort, e);
// We're told to use some port but it's occupied, fail // We're told to use some port but it's occupied, fail
if (defaultClientPort > 0) return -1; if (hasValidClientPortInList(i)) {
return -1;
}
// This port is already in use, try to use another. // This port is already in use, try to use another.
tentativePort = selectClientPort(); tentativePort = selectClientPort(tentativePort);
currentClientPort = tentativePort;
continue; continue;
} }
break; break;
@ -180,15 +249,21 @@ public class MiniZooKeeperCluster {
// Start up this ZK server // Start up this ZK server
standaloneServerFactory.startup(server); standaloneServerFactory.startup(server);
// Runs a 'stat' against the servers. // Runs a 'stat' against the servers.
if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) { if (!waitForServerUp(currentClientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server"); throw new IOException("Waiting for startup of standalone server");
} }
// We have selected this port as a client port. // We have selected a port as a client port. Update clientPortList if necessary.
clientPortList.add(tentativePort); if (clientPortList.size() <= i) { // it is not in the list, add the port
clientPortList.add(currentClientPort);
}
else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
clientPortList.remove(i);
clientPortList.add(i, currentClientPort);
}
standaloneServerFactoryList.add(standaloneServerFactory); standaloneServerFactoryList.add(standaloneServerFactory);
zooKeeperServers.add(server); zooKeeperServers.add(server);
tentativePort++; //for the next server
} }
// set the first one to be active ZK; Others are backups // set the first one to be active ZK; Others are backups
@ -251,7 +326,7 @@ public class MiniZooKeeperCluster {
*/ */
public int killCurrentActiveZooKeeperServer() throws IOException, public int killCurrentActiveZooKeeperServer() throws IOException,
InterruptedException { InterruptedException {
if (!started || activeZKServerIndex < 0 ) { if (!started || activeZKServerIndex < 0) {
return -1; return -1;
} }

View File

@ -20,10 +20,6 @@
package org.apache.hadoop.hbase.zookeeper; package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
@ -41,31 +37,7 @@ public class ZooKeeperMainServer {
private static final String SERVER_ARG = "-server"; private static final String SERVER_ARG = "-server";
public String parse(final Configuration c) { public String parse(final Configuration c) {
// Note that we do not simply grab the property return ZKConfig.getZKQuorumServersString(c);
// HConstants.ZOOKEEPER_QUORUM from the HBaseConfiguration because the
// user may be using a zoo.cfg file.
Properties zkProps = ZKConfig.makeZKProps(c);
String clientPort = null;
List<String> hosts = new ArrayList<String>();
for (Entry<Object, Object> entry: zkProps.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.startsWith("server.")) {
String[] parts = value.split(":");
hosts.add(parts[0]);
} else if (key.endsWith("clientPort")) {
clientPort = value;
}
}
if (hosts.isEmpty() || clientPort == null) return null;
StringBuilder host = new StringBuilder();
for (int i = 0; i < hosts.size(); i++) {
if (i > 0) host.append("," + hosts.get(i));
else host.append(hosts.get(i));
host.append(":");
host.append(clientPort);
}
return host.toString();
} }
/** /**

View File

@ -697,15 +697,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* @see #shutdownMiniZKCluster() * @see #shutdownMiniZKCluster()
* @return zk cluster started. * @return zk cluster started.
*/ */
public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) public MiniZooKeeperCluster startMiniZKCluster(
final int zooKeeperServerNum,
final int ... clientPortList)
throws Exception { throws Exception {
setupClusterTestDir(); setupClusterTestDir();
return startMiniZKCluster(clusterTestDir, zooKeeperServerNum); return startMiniZKCluster(clusterTestDir, zooKeeperServerNum, clientPortList);
} }
private MiniZooKeeperCluster startMiniZKCluster(final File dir) private MiniZooKeeperCluster startMiniZKCluster(final File dir)
throws Exception { throws Exception {
return startMiniZKCluster(dir,1); return startMiniZKCluster(dir, 1, null);
} }
/** /**
@ -713,7 +715,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* the port mentionned is used as the default port for ZooKeeper. * the port mentionned is used as the default port for ZooKeeper.
*/ */
private MiniZooKeeperCluster startMiniZKCluster(final File dir, private MiniZooKeeperCluster startMiniZKCluster(final File dir,
int zooKeeperServerNum) final int zooKeeperServerNum,
final int [] clientPortList)
throws Exception { throws Exception {
if (this.zkCluster != null) { if (this.zkCluster != null) {
throw new IOException("Cluster already running at " + dir); throw new IOException("Cluster already running at " + dir);
@ -725,6 +728,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
// If there is a port in the config file, we use it. // If there is a port in the config file, we use it.
this.zkCluster.setDefaultClientPort(defPort); this.zkCluster.setDefaultClientPort(defPort);
} }
if (clientPortList != null) {
// Ignore extra client ports
int clientPortListSize = (clientPortList.length <= zooKeeperServerNum) ?
clientPortList.length : zooKeeperServerNum;
for (int i=0; i < clientPortListSize; i++) {
this.zkCluster.addClientPort(clientPortList[i]);
}
}
int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum); int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum);
this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
Integer.toString(clientPort)); Integer.toString(clientPort));

View File

@ -39,7 +39,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import java.io.File; import java.io.File;
import java.util.List;
/** /**
* Test our testing utility class * Test our testing utility class
@ -185,8 +187,8 @@ public class TestHBaseTestingUtility {
htu1.shutdownMiniCluster(); htu1.shutdownMiniCluster();
} }
@Test
@Test public void testMiniZooKeeper() throws Exception { public void testMiniZooKeeperWithOneServer() throws Exception {
HBaseTestingUtility hbt = new HBaseTestingUtility(); HBaseTestingUtility hbt = new HBaseTestingUtility();
MiniZooKeeperCluster cluster1 = hbt.startMiniZKCluster(); MiniZooKeeperCluster cluster1 = hbt.startMiniZKCluster();
try { try {
@ -195,7 +197,11 @@ public class TestHBaseTestingUtility {
} finally { } finally {
hbt.shutdownMiniZKCluster(); hbt.shutdownMiniZKCluster();
} }
}
@Test
public void testMiniZooKeeperWithMultipleServers() throws Exception {
HBaseTestingUtility hbt = new HBaseTestingUtility();
// set up zookeeper cluster with 5 zk servers // set up zookeeper cluster with 5 zk servers
MiniZooKeeperCluster cluster2 = hbt.startMiniZKCluster(5); MiniZooKeeperCluster cluster2 = hbt.startMiniZKCluster(5);
int defaultClientPort = 21818; int defaultClientPort = 21818;
@ -235,6 +241,111 @@ public class TestHBaseTestingUtility {
} }
} }
@Test
public void testMiniZooKeeperWithMultipleClientPorts() throws Exception {
int defaultClientPort = 8888;
int i, j;
HBaseTestingUtility hbt = new HBaseTestingUtility();
// Test 1 - set up zookeeper cluster with same number of ZK servers and specified client ports
int [] clientPortList1 = {1111, 1112, 1113};
MiniZooKeeperCluster cluster1 = hbt.startMiniZKCluster(clientPortList1.length, clientPortList1);
try {
List<Integer> clientPortListInCluster = cluster1.getClientPortList();
for (i = 0; i < clientPortListInCluster.size(); i++) {
assertEquals(clientPortListInCluster.get(i).intValue(), clientPortList1[i]);
}
} finally {
hbt.shutdownMiniZKCluster();
}
// Test 2 - set up zookeeper cluster with more ZK servers than specified client ports
hbt.getConfiguration().setInt("test.hbase.zookeeper.property.clientPort", defaultClientPort);
int [] clientPortList2 = {2222, 2223};
MiniZooKeeperCluster cluster2 =
hbt.startMiniZKCluster(clientPortList2.length + 2, clientPortList2);
try {
List<Integer> clientPortListInCluster = cluster2.getClientPortList();
for (i = 0, j = 0; i < clientPortListInCluster.size(); i++) {
if (i < clientPortList2.length) {
assertEquals(clientPortListInCluster.get(i).intValue(), clientPortList2[i]);
} else {
// servers with no specified client port will use defaultClientPort or some other ports
// based on defaultClientPort
assertEquals(clientPortListInCluster.get(i).intValue(), defaultClientPort + j);
j++;
}
}
} finally {
hbt.shutdownMiniZKCluster();
}
// Test 3 - set up zookeeper cluster with invalid client ports
hbt.getConfiguration().setInt("test.hbase.zookeeper.property.clientPort", defaultClientPort);
int [] clientPortList3 = {3333, -3334, 3335, 0};
MiniZooKeeperCluster cluster3 =
hbt.startMiniZKCluster(clientPortList3.length + 1, clientPortList3);
try {
List<Integer> clientPortListInCluster = cluster3.getClientPortList();
for (i = 0, j = 0; i < clientPortListInCluster.size(); i++) {
// Servers will only use valid client ports; if ports are not specified or invalid,
// the default port or a port based on default port will be used.
if (i < clientPortList3.length && clientPortList3[i] > 0) {
assertEquals(clientPortListInCluster.get(i).intValue(), clientPortList3[i]);
} else {
assertEquals(clientPortListInCluster.get(i).intValue(), defaultClientPort + j);
j++;
}
}
} finally {
hbt.shutdownMiniZKCluster();
}
// Test 4 - set up zookeeper cluster with default port and some other ports used
// This test tests that the defaultClientPort and defaultClientPort+2 are used, so
// the algorithm should choice defaultClientPort+1 and defaultClientPort+3 to fill
// out the ports for servers without ports specified.
hbt.getConfiguration().setInt("test.hbase.zookeeper.property.clientPort", defaultClientPort);
int [] clientPortList4 = {-4444, defaultClientPort+2, 4446, defaultClientPort};
MiniZooKeeperCluster cluster4 =
hbt.startMiniZKCluster(clientPortList4.length + 1, clientPortList4);
try {
List<Integer> clientPortListInCluster = cluster4.getClientPortList();
for (i = 0, j = 1; i < clientPortListInCluster.size(); i++) {
// Servers will only use valid client ports; if ports are not specified or invalid,
// the default port or a port based on default port will be used.
if (i < clientPortList4.length && clientPortList4[i] > 0) {
assertEquals(clientPortListInCluster.get(i).intValue(), clientPortList4[i]);
} else {
assertEquals(clientPortListInCluster.get(i).intValue(), defaultClientPort + j);
j +=2;
}
}
} finally {
hbt.shutdownMiniZKCluster();
}
// Test 5 - set up zookeeper cluster with same ports specified - fail is expected.
int [] clientPortList5 = {5555, 5556, 5556};
try {
MiniZooKeeperCluster cluster5 =
hbt.startMiniZKCluster(clientPortList5.length, clientPortList5);
assertTrue(cluster5.getClientPort() == -1); // expected failure
} catch (Exception e) {
// exception is acceptable
} finally {
hbt.shutdownMiniZKCluster();
}
}
@Test public void testMiniDFSCluster() throws Exception { @Test public void testMiniDFSCluster() throws Exception {
HBaseTestingUtility hbt = new HBaseTestingUtility(); HBaseTestingUtility hbt = new HBaseTestingUtility();
MiniDFSCluster cluster = hbt.startMiniDFSCluster(null); MiniDFSCluster cluster = hbt.startMiniDFSCluster(null);

View File

@ -347,8 +347,8 @@ public class TestZooKeeper {
@Test @Test
public void testClusterKey() throws Exception { public void testClusterKey() throws Exception {
testKey("server", "2181", "hbase"); testKey("server", 2181, "hbase");
testKey("server1,server2,server3", "2181", "hbase"); testKey("server1,server2,server3", 2181, "hbase");
try { try {
ZKUtil.transformClusterKey("2181:hbase"); ZKUtil.transformClusterKey("2181:hbase");
} catch (IOException ex) { } catch (IOException ex) {
@ -356,20 +356,58 @@ public class TestZooKeeper {
} }
} }
private void testKey(String ensemble, String port, String znode) @Test
public void testClusterKeyWithMultiplePorts() throws Exception {
// server has different port than the default port
testKey("server1:2182", 2181, "hbase", true);
// multiple servers have their own port
testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
// one server has no specified port, should use default port
testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
// the last server has no specified port, should use default port
testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
// multiple servers have no specified port, should use default port for those servers
testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
// same server, different ports
testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
// mix of same server/different port and different server
testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
}
private void testKey(String ensemble, int port, String znode)
throws IOException {
testKey(ensemble, port, znode, false); // not support multiple client ports
}
private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
throws IOException { throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
String key = ensemble+":"+port+":"+znode; String key = ensemble+":"+port+":"+znode;
String[] parts = ZKUtil.transformClusterKey(key); String ensemble2 = null;
assertEquals(ensemble, parts[0]); ZKUtil.ZKClusterKey zkClusterKey = ZKUtil.transformClusterKey(key);
assertEquals(port, parts[1]); if (multiplePortSupport) {
assertEquals(znode, parts[2]); ensemble2 = ZKUtil.standardizeQuorumServerString(ensemble, Integer.toString(port));
assertEquals(ensemble2, zkClusterKey.quorumString);
}
else {
assertEquals(ensemble, zkClusterKey.quorumString);
}
assertEquals(port, zkClusterKey.clientPort);
assertEquals(znode, zkClusterKey.znodeParent);
ZKUtil.applyClusterKeyToConf(conf, key); ZKUtil.applyClusterKeyToConf(conf, key);
assertEquals(parts[0], conf.get(HConstants.ZOOKEEPER_QUORUM)); assertEquals(zkClusterKey.quorumString, conf.get(HConstants.ZOOKEEPER_QUORUM));
assertEquals(parts[1], conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); assertEquals(zkClusterKey.clientPort, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
assertEquals(parts[2], conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); assertEquals(zkClusterKey.znodeParent, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf); String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf);
assertEquals(key, reconstructedKey); if (multiplePortSupport) {
String key2 = ensemble2 + ":" + port + ":" + znode;
assertEquals(key2, reconstructedKey);
}
else {
assertEquals(key, reconstructedKey);
}
} }
/** /**

View File

@ -140,5 +140,14 @@ public class TestHQuorumPeer {
assertEquals(2181, p.get("clientPort")); assertEquals(2181, p.get("clientPort"));
} }
@Test
public void testGetZKQuorumServersString() {
Configuration config = new Configuration(TEST_UTIL.getConfiguration());
config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 8888);
config.set(HConstants.ZOOKEEPER_QUORUM, "foo:1234,bar:5678,baz,qux:9012");
String s = ZKConfig.getZKQuorumServersString(config);
assertEquals("foo:1234,bar:5678,baz:8888,qux:9012", s);
}
} }

View File

@ -71,10 +71,9 @@ public class TestRecoverableZooKeeper {
public void testSetDataVersionMismatchInLoop() throws Exception { public void testSetDataVersionMismatchInLoop() throws Exception {
String znode = "/hbase/splitWAL/9af7cfc9b15910a0b3d714bf40a3248f"; String znode = "/hbase/splitWAL/9af7cfc9b15910a0b3d714bf40a3248f";
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
Properties properties = ZKConfig.makeZKProps(conf);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testSetDataVersionMismatchInLoop", ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testSetDataVersionMismatchInLoop",
abortable, true); abortable, true);
String ensemble = ZKConfig.getZKQuorumServersString(properties); String ensemble = ZKConfig.getZKQuorumServersString(conf);
RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw); RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw);
rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
rzk.setData(znode, "OPENING".getBytes(), 0); rzk.setData(znode, "OPENING".getBytes(), 0);

View File

@ -103,5 +103,15 @@ public class TestZooKeeperMainServer {
c.set("hbase.zookeeper.quorum", "example1.com,example2.com,example3.com"); c.set("hbase.zookeeper.quorum", "example1.com,example2.com,example3.com");
String ensemble = parser.parse(c); String ensemble = parser.parse(c);
assertTrue(port, ensemble.matches("(example[1-3]\\.com:1234,){2}example[1-3]\\.com:" + port)); assertTrue(port, ensemble.matches("(example[1-3]\\.com:1234,){2}example[1-3]\\.com:" + port));
// multiple servers with its own port
c.set("hbase.zookeeper.quorum", "example1.com:5678,example2.com:9012,example3.com:3456");
ensemble = parser.parse(c);
assertEquals(ensemble, "example1.com:5678,example2.com:9012,example3.com:3456");
// some servers without its own port, which will be assigned the default client port
c.set("hbase.zookeeper.quorum", "example1.com:5678,example2.com:9012,example3.com");
ensemble = parser.parse(c);
assertEquals(ensemble, "example1.com:5678,example2.com:9012,example3.com:" + port);
} }
} }