diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java index e4aedc49235..15752c29d3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java @@ -33,6 +33,11 @@ import org.apache.hadoop.hbase.HConstants; /** * Utility methods for reading, and building the ZooKeeper configuration. + * + * The order and priority for reading the config are as follows: + * (1). zoo.cfg if ""hbase.config.read.zookeeper.config" is true + * (2). Property with "hbase.zookeeper.property." prefix from HBase XML + * (3). other zookeeper related properties in HBASE XML */ @InterfaceAudience.Private public class ZKConfig { @@ -51,6 +56,24 @@ public class ZKConfig { * @return Properties holding mappings representing ZooKeeper config file. */ public static Properties makeZKProps(Configuration conf) { + Properties zkProperties = makeZKPropsFromZooCfg(conf); + + if (zkProperties == null) { + // Otherwise, use the configuration options from HBase's XML files. + zkProperties = makeZKPropsFromHbaseConfig(conf); + } + return zkProperties; + } + + /** + * Parses the corresponding config options from the zoo.cfg file + * and make a Properties object holding the Zookeeper config. + * + * @param conf Configuration to read from. + * @return Properties holding mappings representing the ZooKeeper config file or null if + * the HBASE_CONFIG_READ_ZOOKEEPER_CONFIG is false or the file does not exist. + */ + private static Properties makeZKPropsFromZooCfg(Configuration conf) { if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) { LOG.warn( "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME + @@ -80,7 +103,18 @@ public class ZKConfig { } } - // Otherwise, use the configuration options from HBase's XML files. + return null; + } + + /** + * Make a Properties object holding ZooKeeper config. + * Parses the corresponding config options from the HBase XML configs + * and generates the appropriate ZooKeeper properties. + * + * @param conf Configuration to read from. + * @return Properties holding mappings representing ZooKeeper config file. + */ + private static Properties makeZKPropsFromHbaseConfig(Configuration conf) { Properties zkProperties = new Properties(); // Directly map all of the hbase.zookeeper.property.KEY properties. @@ -112,10 +146,17 @@ public class ZKConfig { final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + String serverHost; + String address; + String key; for (int i = 0; i < serverHosts.length; ++i) { - String serverHost = serverHosts[i]; - String address = serverHost + ":" + peerPort + ":" + leaderPort; - String key = "server." + i; + if (serverHosts[i].contains(":")) { + serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':')); + } else { + serverHost = serverHosts[i]; + } + address = serverHost + ":" + peerPort + ":" + leaderPort; + key = "server." + i; zkProperties.put(key, address); } @@ -177,7 +218,8 @@ public class ZKConfig { } // Special case for 'hbase.cluster.distributed' property being 'true' if (key.startsWith("server.")) { - boolean mode = conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); + boolean mode = + conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) { String msg = "The server in zoo.cfg cannot be set to localhost " + "in a fully-distributed setup because it won't be reachable. " + @@ -198,7 +240,7 @@ public class ZKConfig { * @param properties * @return Quorum servers String */ - public static String getZKQuorumServersString(Properties properties) { + private static String getZKQuorumServersString(Properties properties) { String clientPort = null; List servers = new ArrayList(); @@ -250,12 +292,59 @@ public class ZKConfig { return hostPortBuilder.toString(); } + /** + * Return the ZK Quorum servers string given the specified configuration + * + * @param conf + * @return Quorum servers String + */ + private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) { + String defaultClientPort = Integer.toString( + conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT)); + + // Build the ZK quorum server string with "server:clientport" list, separated by ',' + final String[] serverHosts = + conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + return buildQuorumServerString(serverHosts, defaultClientPort); + } + + /** + * Build the ZK quorum server string with "server:clientport" list, separated by ',' + * + * @param serverHosts a list of servers for ZK quorum + * @param clientPort the default client port + * @return the string for a list of "server:port" separated by "," + */ + public static String buildQuorumServerString(String[] serverHosts, String clientPort) { + StringBuilder quorumStringBuilder = new StringBuilder(); + String serverHost; + for (int i = 0; i < serverHosts.length; ++i) { + if (serverHosts[i].contains(":")) { + serverHost = serverHosts[i]; // just use the port specified from the input + } else { + serverHost = serverHosts[i] + ":" + clientPort; + } + if (i > 0) { + quorumStringBuilder.append(','); + } + quorumStringBuilder.append(serverHost); + } + return quorumStringBuilder.toString(); + } + /** * Return the ZK Quorum servers string given the specified configuration. * @param conf * @return Quorum servers */ public static String getZKQuorumServersString(Configuration conf) { - return getZKQuorumServersString(makeZKProps(conf)); + // First try zoo.cfg; if not applicable, then try config XML. + Properties zkProperties = makeZKPropsFromZooCfg(conf); + + if (zkProperties != null) { + return getZKQuorumServersString(zkProperties); + } + + return getZKQuorumServersStringFromHbaseConfig(conf); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index ce30e05b5e6..33a3c08ba7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -31,7 +31,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Properties; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; @@ -74,6 +73,7 @@ import org.apache.zookeeper.proto.DeleteRequest; import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.ZooKeeperSaslServer; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; /** @@ -93,6 +93,25 @@ public class ZKUtil { public static final char ZNODE_PATH_SEPARATOR = '/'; private static int zkDumpConnectionTimeOut; + // The Quorum for the ZK cluster can have one the following format (see examples below): + // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort) + // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server, + // in this case, the clientPort would be ignored) + // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use + // the clientPort; otherwise, it would use the specified port) + @VisibleForTesting + public static class ZKClusterKey { + public String quorumString; + public int clientPort; + public String znodeParent; + + ZKClusterKey(String quorumString, int clientPort, String znodeParent) { + this.quorumString = quorumString; + this.clientPort = clientPort; + this.znodeParent = znodeParent; + } + } + /** * Creates a new connection to ZooKeeper, pulling settings and ensemble config * from the specified configuration object using methods from {@link ZKConfig}. @@ -106,8 +125,7 @@ public class ZKUtil { */ public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) throws IOException { - Properties properties = ZKConfig.makeZKProps(conf); - String ensemble = ZKConfig.getZKQuorumServersString(properties); + String ensemble = ZKConfig.getZKQuorumServersString(conf); return connect(conf, ensemble, watcher); } @@ -385,10 +403,10 @@ public class ZKUtil { */ public static void applyClusterKeyToConf(Configuration conf, String key) throws IOException{ - String[] parts = transformClusterKey(key); - conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); - conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]); + ZKClusterKey zkClusterKey = transformClusterKey(key); + conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.quorumString); + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.clientPort); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.znodeParent); } /** @@ -399,14 +417,53 @@ public class ZKUtil { * @return the three configuration in the described order * @throws IOException */ - public static String[] transformClusterKey(String key) throws IOException { + public static ZKClusterKey transformClusterKey(String key) throws IOException { String[] parts = key.split(":"); - if (parts.length != 3) { - throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" + - HConstants.ZOOKEEPER_QUORUM + ":hbase.zookeeper.client.port:" - + HConstants.ZOOKEEPER_ZNODE_PARENT); + + if (parts.length == 3) { + return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]); } - return parts; + + if (parts.length > 3) { + // The quorum could contain client port in server:clientport format, try to transform more. + String zNodeParent = parts [parts.length - 1]; + String clientPort = parts [parts.length - 2]; + + // The first part length is the total length minus the lengths of other parts and minus 2 ":" + int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2; + String quorumStringInput = key.substring(0, endQuorumIndex); + String[] serverHosts = quorumStringInput.split(","); + + // The common case is that every server has its own client port specified - this means + // that (total parts - the ZNodeParent part - the ClientPort part) is equal to + // (the number of "," + 1) - "+ 1" because the last server has no ",". + if ((parts.length - 2) == (serverHosts.length + 1)) { + return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent); + } + + // For the uncommon case that some servers has no port specified, we need to build the + // server:clientport list using default client port for servers without specified port. + return new ZKClusterKey( + ZKConfig.buildQuorumServerString(serverHosts, clientPort), + Integer.parseInt(clientPort), + zNodeParent); + } + + throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" + + HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":" + + HConstants.ZOOKEEPER_ZNODE_PARENT); + } + + /** + * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ',' + * @param quorumStringInput a string contains a list of servers for ZK quorum + * @param clientPort the default client port + * @return the string for a list of "server:port" separated by "," + */ + @VisibleForTesting + public static String standardizeQuorumServerString(String quorumStringInput, String clientPort) { + String[] serverHosts = quorumStringInput.split(","); + return ZKConfig.buildQuorumServerString(serverHosts, clientPort); } // @@ -941,7 +998,8 @@ public class ZKUtil { // Detection for embedded HBase client with jaas configuration // defined for third party programs. try { - javax.security.auth.login.Configuration testConfig = javax.security.auth.login.Configuration.getConfiguration(); + javax.security.auth.login.Configuration testConfig = + javax.security.auth.login.Configuration.getConfiguration(); if(testConfig.getAppConfigurationEntry("Client") == null) { return false; } @@ -1210,7 +1268,6 @@ public class ZKUtil { } catch (InterruptedException ie) { zkw.interruptedException(ie); } - } catch(InterruptedException ie) { zkw.interruptedException(ie); } @@ -1337,8 +1394,9 @@ public class ZKUtil { deleteNodeRecursively(zkw, joinZNode(node, child)); } } - //Zookeeper Watches are one time triggers; When children of parent nodes are deleted recursively. - //Must set another watch, get notified of delete node + + //Zookeeper Watches are one time triggers; When children of parent nodes are deleted + //recursively, must set another watch, get notified of delete node if (zkw.getRecoverableZooKeeper().exists(node, zkw) != null){ zkw.getRecoverableZooKeeper().delete(node, -1); } @@ -2026,7 +2084,8 @@ public class ZKUtil { * @see #logZKTree(ZooKeeperWatcher, String) * @throws KeeperException if an unexpected exception occurs */ - protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix) throws KeeperException { + protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix) + throws KeeperException { List children = ZKUtil.listChildrenNoWatch(zkw, root); if (children == null) return; for (String child : children) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java index f1606f11ef6..2a3ef089862 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java @@ -156,10 +156,39 @@ public class HMasterCommandLine extends ServerCommandLine { DefaultMetricsSystem.setMiniClusterMode(true); final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf); File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR)); - int zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0); + + // find out the default client port + int zkClientPort = 0; + + // If the zookeeper client port is specified in server quorum, use it. + String zkserver = conf.get(HConstants.ZOOKEEPER_QUORUM); + if (zkserver != null) { + String[] zkservers = zkserver.split(","); + + if (zkservers.length > 1) { + // In local mode deployment, we have the master + a region server and zookeeper server + // started in the same process. Therefore, we only support one zookeeper server. + String errorMsg = "Could not start ZK with " + zkservers.length + + " ZK servers in local mode deployment. Aborting as clients (e.g. shell) will not " + + "be able to find this ZK quorum."; + System.err.println(errorMsg); + throw new IOException(errorMsg); + } + + String[] parts = zkservers[0].split(":"); + + if (parts.length == 2) { + // the second part is the client port + zkClientPort = Integer.parseInt(parts [1]); + } + } + // If the client port could not be find in server quorum conf, try another conf if (zkClientPort == 0) { - throw new IOException("No config value for " - + HConstants.ZOOKEEPER_CLIENT_PORT); + zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0); + // The client port has to be set by now; if not, throw exception. + if (zkClientPort == 0) { + throw new IOException("No config value for " + HConstants.ZOOKEEPER_CLIENT_PORT); + } } zooKeeperCluster.setDefaultClientPort(zkClientPort); @@ -180,6 +209,7 @@ public class HMasterCommandLine extends ServerCommandLine { throw new IOException(errorMsg); } conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort)); + // Need to have the zk cluster shutdown when master is shutdown. // Run a subclass that does the zk cluster shutdown on its way out. int mastersCount = conf.getInt("hbase.masters", 1); @@ -254,7 +284,8 @@ public class HMasterCommandLine extends ServerCommandLine { } } - private static void closeAllRegionServerThreads(List regionservers) { + private static void closeAllRegionServerThreads( + List regionservers) { for(JVMClusterUtil.RegionServerThread t : regionservers){ t.getRegionServer().stop("HMaster Aborted; Bringing down regions servers"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java index 926cd10f403..fe8a17e6959 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java @@ -42,6 +42,8 @@ import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnLog; +import com.google.common.annotations.VisibleForTesting; + /** * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead * of redoing it, we should contribute updates to their code which let us more @@ -82,6 +84,33 @@ public class MiniZooKeeperCluster { standaloneServerFactoryList = new ArrayList(); } + /** + * Add a client port to the list. + * + * @param clientPort the specified port + */ + public void addClientPort(int clientPort) { + clientPortList.add(clientPort); + } + + /** + * Get the list of client ports. + * @return clientPortList the client port list + */ + @VisibleForTesting + public List getClientPortList() { + return clientPortList; + } + + /** + * Check whether the client port in a specific position of the client port list is valid. + * + * @param index the specified position + */ + private boolean hasValidClientPortInList(int index) { + return (clientPortList.size() > index && clientPortList.get(index) > 0); + } + public void setDefaultClientPort(int clientPort) { if (clientPort <= 0) { throw new IllegalArgumentException("Invalid default ZK client port: " @@ -91,16 +120,39 @@ public class MiniZooKeeperCluster { } /** - * Selects a ZK client port. Returns the default port if specified. - * Otherwise, returns a random port. The random port is selected from the - * range between 49152 to 65535. These ports cannot be registered with IANA - * and are intended for dynamic allocation (see http://bit.ly/dynports). + * Selects a ZK client port. + * + * @param seedPort the seed port to start with; -1 means first time. + * @Returns a valid and unused client port */ - private int selectClientPort() { - if (defaultClientPort > 0) { - return defaultClientPort; + private int selectClientPort(int seedPort) { + int i; + int returnClientPort = seedPort + 1; + if (returnClientPort == 0) { + // If the new port is invalid, find one - starting with the default client port. + // If the default client port is not specified, starting with a random port. + // The random port is selected from the range between 49152 to 65535. These ports cannot be + // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports). + if (defaultClientPort > 0) { + returnClientPort = defaultClientPort; + } else { + returnClientPort = 0xc000 + new Random().nextInt(0x3f00); + } } - return 0xc000 + new Random().nextInt(0x3f00); + // Make sure that the port is unused. + while (true) { + for (i = 0; i < clientPortList.size(); i++) { + if (returnClientPort == clientPortList.get(i)) { + // Already used. Update the port and retry. + returnClientPort++; + break; + } + } + if (i == clientPortList.size()) { + break; // found a unused port, exit + } + } + return returnClientPort; } public void setTickTime(int tickTime) { @@ -126,7 +178,11 @@ public class MiniZooKeeperCluster { } public int startup(File baseDir) throws IOException, InterruptedException { - return startup(baseDir,1); + int numZooKeeperServers = clientPortList.size(); + if (numZooKeeperServers == 0) { + numZooKeeperServers = 1; // need at least 1 ZK server for testing + } + return startup(baseDir, numZooKeeperServers); } /** @@ -145,7 +201,8 @@ public class MiniZooKeeperCluster { setupTestEnv(); shutdown(); - int tentativePort = selectClientPort(); + int tentativePort = -1; // the seed port + int currentClientPort; // running all the ZK servers for (int i = 0; i < numZooKeeperServers; i++) { @@ -157,21 +214,33 @@ public class MiniZooKeeperCluster { } else { tickTimeToUse = TICK_TIME; } + + // Set up client port - if we have already had a list of valid ports, use it. + if (hasValidClientPortInList(i)) { + currentClientPort = clientPortList.get(i); + } else { + tentativePort = selectClientPort(tentativePort); // update the seed + currentClientPort = tentativePort; + } + ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); NIOServerCnxnFactory standaloneServerFactory; while (true) { try { standaloneServerFactory = new NIOServerCnxnFactory(); standaloneServerFactory.configure( - new InetSocketAddress(tentativePort), + new InetSocketAddress(currentClientPort), configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000)); } catch (BindException e) { LOG.debug("Failed binding ZK Server to client port: " + - tentativePort, e); + currentClientPort, e); // We're told to use some port but it's occupied, fail - if (defaultClientPort > 0) return -1; + if (hasValidClientPortInList(i)) { + return -1; + } // This port is already in use, try to use another. - tentativePort = selectClientPort(); + tentativePort = selectClientPort(tentativePort); + currentClientPort = tentativePort; continue; } break; @@ -180,15 +249,21 @@ public class MiniZooKeeperCluster { // Start up this ZK server standaloneServerFactory.startup(server); // Runs a 'stat' against the servers. - if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) { + if (!waitForServerUp(currentClientPort, CONNECTION_TIMEOUT)) { throw new IOException("Waiting for startup of standalone server"); } - // We have selected this port as a client port. - clientPortList.add(tentativePort); + // We have selected a port as a client port. Update clientPortList if necessary. + if (clientPortList.size() <= i) { // it is not in the list, add the port + clientPortList.add(currentClientPort); + } + else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port + clientPortList.remove(i); + clientPortList.add(i, currentClientPort); + } + standaloneServerFactoryList.add(standaloneServerFactory); zooKeeperServers.add(server); - tentativePort++; //for the next server } // set the first one to be active ZK; Others are backups @@ -251,7 +326,7 @@ public class MiniZooKeeperCluster { */ public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException { - if (!started || activeZKServerIndex < 0 ) { + if (!started || activeZKServerIndex < 0) { return -1; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperMainServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperMainServer.java index 86348a3e103..e81da596a20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperMainServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperMainServer.java @@ -20,10 +20,6 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -41,31 +37,7 @@ public class ZooKeeperMainServer { private static final String SERVER_ARG = "-server"; public String parse(final Configuration c) { - // Note that we do not simply grab the property - // HConstants.ZOOKEEPER_QUORUM from the HBaseConfiguration because the - // user may be using a zoo.cfg file. - Properties zkProps = ZKConfig.makeZKProps(c); - String clientPort = null; - List hosts = new ArrayList(); - for (Entry entry: zkProps.entrySet()) { - String key = entry.getKey().toString().trim(); - String value = entry.getValue().toString().trim(); - if (key.startsWith("server.")) { - String[] parts = value.split(":"); - hosts.add(parts[0]); - } else if (key.endsWith("clientPort")) { - clientPort = value; - } - } - if (hosts.isEmpty() || clientPort == null) return null; - StringBuilder host = new StringBuilder(); - for (int i = 0; i < hosts.size(); i++) { - if (i > 0) host.append("," + hosts.get(i)); - else host.append(hosts.get(i)); - host.append(":"); - host.append(clientPort); - } - return host.toString(); + return ZKConfig.getZKQuorumServersString(c); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e58f6a23c9b..04f3676235b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -697,15 +697,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @see #shutdownMiniZKCluster() * @return zk cluster started. */ - public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) + public MiniZooKeeperCluster startMiniZKCluster( + final int zooKeeperServerNum, + final int ... clientPortList) throws Exception { setupClusterTestDir(); - return startMiniZKCluster(clusterTestDir, zooKeeperServerNum); + return startMiniZKCluster(clusterTestDir, zooKeeperServerNum, clientPortList); } private MiniZooKeeperCluster startMiniZKCluster(final File dir) throws Exception { - return startMiniZKCluster(dir,1); + return startMiniZKCluster(dir, 1, null); } /** @@ -713,7 +715,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * the port mentionned is used as the default port for ZooKeeper. */ private MiniZooKeeperCluster startMiniZKCluster(final File dir, - int zooKeeperServerNum) + final int zooKeeperServerNum, + final int [] clientPortList) throws Exception { if (this.zkCluster != null) { throw new IOException("Cluster already running at " + dir); @@ -725,6 +728,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { // If there is a port in the config file, we use it. this.zkCluster.setDefaultClientPort(defPort); } + + if (clientPortList != null) { + // Ignore extra client ports + int clientPortListSize = (clientPortList.length <= zooKeeperServerNum) ? + clientPortList.length : zooKeeperServerNum; + for (int i=0; i < clientPortListSize; i++) { + this.zkCluster.addClientPort(clientPortList[i]); + } + } int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum); this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java index 84ee9636b1d..67e5b51cde1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java @@ -39,7 +39,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; import org.junit.Test; import org.junit.experimental.categories.Category; + import java.io.File; +import java.util.List; /** * Test our testing utility class @@ -185,8 +187,8 @@ public class TestHBaseTestingUtility { htu1.shutdownMiniCluster(); } - - @Test public void testMiniZooKeeper() throws Exception { + @Test + public void testMiniZooKeeperWithOneServer() throws Exception { HBaseTestingUtility hbt = new HBaseTestingUtility(); MiniZooKeeperCluster cluster1 = hbt.startMiniZKCluster(); try { @@ -195,7 +197,11 @@ public class TestHBaseTestingUtility { } finally { hbt.shutdownMiniZKCluster(); } + } + @Test + public void testMiniZooKeeperWithMultipleServers() throws Exception { + HBaseTestingUtility hbt = new HBaseTestingUtility(); // set up zookeeper cluster with 5 zk servers MiniZooKeeperCluster cluster2 = hbt.startMiniZKCluster(5); int defaultClientPort = 21818; @@ -235,6 +241,111 @@ public class TestHBaseTestingUtility { } } + @Test + public void testMiniZooKeeperWithMultipleClientPorts() throws Exception { + int defaultClientPort = 8888; + int i, j; + HBaseTestingUtility hbt = new HBaseTestingUtility(); + + // Test 1 - set up zookeeper cluster with same number of ZK servers and specified client ports + int [] clientPortList1 = {1111, 1112, 1113}; + MiniZooKeeperCluster cluster1 = hbt.startMiniZKCluster(clientPortList1.length, clientPortList1); + try { + List clientPortListInCluster = cluster1.getClientPortList(); + + for (i = 0; i < clientPortListInCluster.size(); i++) { + assertEquals(clientPortListInCluster.get(i).intValue(), clientPortList1[i]); + } + } finally { + hbt.shutdownMiniZKCluster(); + } + + // Test 2 - set up zookeeper cluster with more ZK servers than specified client ports + hbt.getConfiguration().setInt("test.hbase.zookeeper.property.clientPort", defaultClientPort); + int [] clientPortList2 = {2222, 2223}; + MiniZooKeeperCluster cluster2 = + hbt.startMiniZKCluster(clientPortList2.length + 2, clientPortList2); + + try { + List clientPortListInCluster = cluster2.getClientPortList(); + + for (i = 0, j = 0; i < clientPortListInCluster.size(); i++) { + if (i < clientPortList2.length) { + assertEquals(clientPortListInCluster.get(i).intValue(), clientPortList2[i]); + } else { + // servers with no specified client port will use defaultClientPort or some other ports + // based on defaultClientPort + assertEquals(clientPortListInCluster.get(i).intValue(), defaultClientPort + j); + j++; + } + } + } finally { + hbt.shutdownMiniZKCluster(); + } + + // Test 3 - set up zookeeper cluster with invalid client ports + hbt.getConfiguration().setInt("test.hbase.zookeeper.property.clientPort", defaultClientPort); + int [] clientPortList3 = {3333, -3334, 3335, 0}; + MiniZooKeeperCluster cluster3 = + hbt.startMiniZKCluster(clientPortList3.length + 1, clientPortList3); + + try { + List clientPortListInCluster = cluster3.getClientPortList(); + + for (i = 0, j = 0; i < clientPortListInCluster.size(); i++) { + // Servers will only use valid client ports; if ports are not specified or invalid, + // the default port or a port based on default port will be used. + if (i < clientPortList3.length && clientPortList3[i] > 0) { + assertEquals(clientPortListInCluster.get(i).intValue(), clientPortList3[i]); + } else { + assertEquals(clientPortListInCluster.get(i).intValue(), defaultClientPort + j); + j++; + } + } + } finally { + hbt.shutdownMiniZKCluster(); + } + + // Test 4 - set up zookeeper cluster with default port and some other ports used + // This test tests that the defaultClientPort and defaultClientPort+2 are used, so + // the algorithm should choice defaultClientPort+1 and defaultClientPort+3 to fill + // out the ports for servers without ports specified. + hbt.getConfiguration().setInt("test.hbase.zookeeper.property.clientPort", defaultClientPort); + int [] clientPortList4 = {-4444, defaultClientPort+2, 4446, defaultClientPort}; + MiniZooKeeperCluster cluster4 = + hbt.startMiniZKCluster(clientPortList4.length + 1, clientPortList4); + + try { + List clientPortListInCluster = cluster4.getClientPortList(); + + for (i = 0, j = 1; i < clientPortListInCluster.size(); i++) { + // Servers will only use valid client ports; if ports are not specified or invalid, + // the default port or a port based on default port will be used. + if (i < clientPortList4.length && clientPortList4[i] > 0) { + assertEquals(clientPortListInCluster.get(i).intValue(), clientPortList4[i]); + } else { + assertEquals(clientPortListInCluster.get(i).intValue(), defaultClientPort + j); + j +=2; + } + } + } finally { + hbt.shutdownMiniZKCluster(); + } + + // Test 5 - set up zookeeper cluster with same ports specified - fail is expected. + int [] clientPortList5 = {5555, 5556, 5556}; + + try { + MiniZooKeeperCluster cluster5 = + hbt.startMiniZKCluster(clientPortList5.length, clientPortList5); + assertTrue(cluster5.getClientPort() == -1); // expected failure + } catch (Exception e) { + // exception is acceptable + } finally { + hbt.shutdownMiniZKCluster(); + } + } + @Test public void testMiniDFSCluster() throws Exception { HBaseTestingUtility hbt = new HBaseTestingUtility(); MiniDFSCluster cluster = hbt.startMiniDFSCluster(null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index cdee735bad2..3065812c2c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -347,8 +347,8 @@ public class TestZooKeeper { @Test public void testClusterKey() throws Exception { - testKey("server", "2181", "hbase"); - testKey("server1,server2,server3", "2181", "hbase"); + testKey("server", 2181, "hbase"); + testKey("server1,server2,server3", 2181, "hbase"); try { ZKUtil.transformClusterKey("2181:hbase"); } catch (IOException ex) { @@ -356,20 +356,58 @@ public class TestZooKeeper { } } - private void testKey(String ensemble, String port, String znode) + @Test + public void testClusterKeyWithMultiplePorts() throws Exception { + // server has different port than the default port + testKey("server1:2182", 2181, "hbase", true); + // multiple servers have their own port + testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true); + // one server has no specified port, should use default port + testKey("server1:2182,server2,server3:2184", 2181, "hbase", true); + // the last server has no specified port, should use default port + testKey("server1:2182,server2:2183,server3", 2181, "hbase", true); + // multiple servers have no specified port, should use default port for those servers + testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true); + // same server, different ports + testKey("server1:2182,server1:2183,server1", 2181, "hbase", true); + // mix of same server/different port and different server + testKey("server1:2182,server2:2183,server1", 2181, "hbase", true); + } + + private void testKey(String ensemble, int port, String znode) + throws IOException { + testKey(ensemble, port, znode, false); // not support multiple client ports + } + + private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport) throws IOException { Configuration conf = new Configuration(); String key = ensemble+":"+port+":"+znode; - String[] parts = ZKUtil.transformClusterKey(key); - assertEquals(ensemble, parts[0]); - assertEquals(port, parts[1]); - assertEquals(znode, parts[2]); + String ensemble2 = null; + ZKUtil.ZKClusterKey zkClusterKey = ZKUtil.transformClusterKey(key); + if (multiplePortSupport) { + ensemble2 = ZKUtil.standardizeQuorumServerString(ensemble, Integer.toString(port)); + assertEquals(ensemble2, zkClusterKey.quorumString); + } + else { + assertEquals(ensemble, zkClusterKey.quorumString); + } + assertEquals(port, zkClusterKey.clientPort); + assertEquals(znode, zkClusterKey.znodeParent); + ZKUtil.applyClusterKeyToConf(conf, key); - assertEquals(parts[0], conf.get(HConstants.ZOOKEEPER_QUORUM)); - assertEquals(parts[1], conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); - assertEquals(parts[2], conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + assertEquals(zkClusterKey.quorumString, conf.get(HConstants.ZOOKEEPER_QUORUM)); + assertEquals(zkClusterKey.clientPort, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1)); + assertEquals(zkClusterKey.znodeParent, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf); - assertEquals(key, reconstructedKey); + if (multiplePortSupport) { + String key2 = ensemble2 + ":" + port + ":" + znode; + assertEquals(key2, reconstructedKey); + } + else { + assertEquals(key, reconstructedKey); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java index cb959caf57a..976565b0d4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java @@ -140,5 +140,14 @@ public class TestHQuorumPeer { assertEquals(2181, p.get("clientPort")); } + @Test + public void testGetZKQuorumServersString() { + Configuration config = new Configuration(TEST_UTIL.getConfiguration()); + config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 8888); + config.set(HConstants.ZOOKEEPER_QUORUM, "foo:1234,bar:5678,baz,qux:9012"); + + String s = ZKConfig.getZKQuorumServersString(config); + assertEquals("foo:1234,bar:5678,baz:8888,qux:9012", s); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java index e7900135532..dd4a6dc0287 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java @@ -71,10 +71,9 @@ public class TestRecoverableZooKeeper { public void testSetDataVersionMismatchInLoop() throws Exception { String znode = "/hbase/splitWAL/9af7cfc9b15910a0b3d714bf40a3248f"; Configuration conf = TEST_UTIL.getConfiguration(); - Properties properties = ZKConfig.makeZKProps(conf); ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testSetDataVersionMismatchInLoop", abortable, true); - String ensemble = ZKConfig.getZKQuorumServersString(properties); + String ensemble = ZKConfig.getZKQuorumServersString(conf); RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw); rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); rzk.setData(znode, "OPENING".getBytes(), 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperMainServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperMainServer.java index b121ff499b6..844791813bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperMainServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperMainServer.java @@ -103,5 +103,15 @@ public class TestZooKeeperMainServer { c.set("hbase.zookeeper.quorum", "example1.com,example2.com,example3.com"); String ensemble = parser.parse(c); assertTrue(port, ensemble.matches("(example[1-3]\\.com:1234,){2}example[1-3]\\.com:" + port)); + + // multiple servers with its own port + c.set("hbase.zookeeper.quorum", "example1.com:5678,example2.com:9012,example3.com:3456"); + ensemble = parser.parse(c); + assertEquals(ensemble, "example1.com:5678,example2.com:9012,example3.com:3456"); + + // some servers without its own port, which will be assigned the default client port + c.set("hbase.zookeeper.quorum", "example1.com:5678,example2.com:9012,example3.com"); + ensemble = parser.parse(c); + assertEquals(ensemble, "example1.com:5678,example2.com:9012,example3.com:" + port); } } \ No newline at end of file