HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection

This commit is contained in:
Gary Helmling 2015-12-09 15:52:27 -08:00
parent ba3aa9a9b1
commit c1e0fcc26d
22 changed files with 620 additions and 482 deletions

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@ -626,7 +625,8 @@ public class ReplicationAdmin implements Closeable {
}
}
private List<ReplicationPeer> listValidReplicationPeers() {
@VisibleForTesting
List<ReplicationPeer> listValidReplicationPeers() {
Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
if (peers == null || peers.size() <= 0) {
return null;
@ -634,18 +634,16 @@ public class ReplicationAdmin implements Closeable {
List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
String peerId = peerEntry.getKey();
String clusterKey = peerEntry.getValue().getClusterKey();
Configuration peerConf = new Configuration(this.connection.getConfiguration());
Stat s = null;
try {
ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
Configuration peerConf = pair.getSecond();
ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
s =
zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
null);
if (null == s) {
LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
continue;
}
validPeers.add(peer);
@ -664,10 +662,6 @@ public class ReplicationAdmin implements Closeable {
LOG.debug("Failure details to get valid replication peers.", e);
Thread.currentThread().interrupt();
continue;
} catch (IOException e) {
LOG.warn("Failed to get valid replication peers due to IOException.");
LOG.debug("Failure details to get valid replication peers.", e);
continue;
}
}
return validPeers;

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@ -318,11 +319,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return null;
}
Configuration otherConf = new Configuration(this.conf);
Configuration otherConf;
try {
if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
}
otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
} catch (IOException e) {
LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
return null;

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -69,7 +70,7 @@ public abstract class ReplicationStateZKBase {
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);

View File

@ -1,155 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Utility methods for reading, and building the ZooKeeper configuration.
*
* The order and priority for reading the config are as follows:
* (1). Property with "hbase.zookeeper.property." prefix from HBase XML
* (2). other zookeeper related properties in HBASE XML
*/
@InterfaceAudience.Private
public class ZKConfig {
private static final String VARIABLE_START = "${";
/**
* Make a Properties object holding ZooKeeper config.
* Parses the corresponding config options from the HBase XML configs
* and generates the appropriate ZooKeeper properties.
* @param conf Configuration to read from.
* @return Properties holding mappings representing ZooKeeper config file.
*/
public static Properties makeZKProps(Configuration conf) {
return makeZKPropsFromHbaseConfig(conf);
}
/**
* Make a Properties object holding ZooKeeper config.
* Parses the corresponding config options from the HBase XML configs
* and generates the appropriate ZooKeeper properties.
*
* @param conf Configuration to read from.
* @return Properties holding mappings representing ZooKeeper config file.
*/
private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
Properties zkProperties = new Properties();
// Directly map all of the hbase.zookeeper.property.KEY properties.
// Synchronize on conf so no loading of configs while we iterate
synchronized (conf) {
for (Entry<String, String> entry : conf) {
String key = entry.getKey();
if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
String value = entry.getValue();
// If the value has variables substitutions, need to do a get.
if (value.contains(VARIABLE_START)) {
value = conf.get(key);
}
zkProperties.setProperty(zkKey, value);
}
}
}
// If clientPort is not set, assign the default.
if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
zkProperties.put(HConstants.CLIENT_PORT_STR,
HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
}
// Create the server.X properties.
int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
HConstants.LOCALHOST);
String serverHost;
String address;
String key;
for (int i = 0; i < serverHosts.length; ++i) {
if (serverHosts[i].contains(":")) {
serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
} else {
serverHost = serverHosts[i];
}
address = serverHost + ":" + peerPort + ":" + leaderPort;
key = "server." + i;
zkProperties.put(key, address);
}
return zkProperties;
}
/**
* Return the ZK Quorum servers string given the specified configuration
*
* @param conf
* @return Quorum servers String
*/
private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
String defaultClientPort = Integer.toString(
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
// Build the ZK quorum server string with "server:clientport" list, separated by ','
final String[] serverHosts =
conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
return buildQuorumServerString(serverHosts, defaultClientPort);
}
/**
* Build the ZK quorum server string with "server:clientport" list, separated by ','
*
* @param serverHosts a list of servers for ZK quorum
* @param clientPort the default client port
* @return the string for a list of "server:port" separated by ","
*/
public static String buildQuorumServerString(String[] serverHosts, String clientPort) {
StringBuilder quorumStringBuilder = new StringBuilder();
String serverHost;
for (int i = 0; i < serverHosts.length; ++i) {
if (serverHosts[i].contains(":")) {
serverHost = serverHosts[i]; // just use the port specified from the input
} else {
serverHost = serverHosts[i] + ":" + clientPort;
}
if (i > 0) {
quorumStringBuilder.append(',');
}
quorumStringBuilder.append(serverHost);
}
return quorumStringBuilder.toString();
}
/**
* Return the ZK Quorum servers string given the specified configuration.
* @return Quorum servers
*/
public static String getZKQuorumServersString(Configuration conf) {
return getZKQuorumServersStringFromHbaseConfig(conf);
}
}

View File

@ -76,7 +76,6 @@ import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.ZooKeeperSaslServer;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
/**
@ -96,25 +95,6 @@ public class ZKUtil {
public static final char ZNODE_PATH_SEPARATOR = '/';
private static int zkDumpConnectionTimeOut;
// The Quorum for the ZK cluster can have one the following format (see examples below):
// (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
// (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
// in this case, the clientPort would be ignored)
// (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
// the clientPort; otherwise, it would use the specified port)
@VisibleForTesting
public static class ZKClusterKey {
public String quorumString;
public int clientPort;
public String znodeParent;
ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
this.quorumString = quorumString;
this.clientPort = clientPort;
this.znodeParent = znodeParent;
}
}
/**
* Creates a new connection to ZooKeeper, pulling settings and ensemble config
* from the specified configuration object using methods from {@link ZKConfig}.
@ -361,110 +341,6 @@ public class ZKUtil {
return path.substring(path.lastIndexOf("/")+1);
}
/**
* Get the key to the ZK ensemble for this configuration without
* adding a name at the end
* @param conf Configuration to use to build the key
* @return ensemble key without a name
*/
public static String getZooKeeperClusterKey(Configuration conf) {
return getZooKeeperClusterKey(conf, null);
}
/**
* Get the key to the ZK ensemble for this configuration and append
* a name at the end
* @param conf Configuration to use to build the key
* @param name Name that should be appended at the end if not empty or null
* @return ensemble key with a name (if any)
*/
public static String getZooKeeperClusterKey(Configuration conf, String name) {
String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
"[\\t\\n\\x0B\\f\\r]", "");
StringBuilder builder = new StringBuilder(ensemble);
builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (name != null && !name.isEmpty()) {
builder.append(",");
builder.append(name);
}
return builder.toString();
}
/**
* Apply the settings in the given key to the given configuration, this is
* used to communicate with distant clusters
* @param conf configuration object to configure
* @param key string that contains the 3 required configuratins
* @throws IOException
*/
public static void applyClusterKeyToConf(Configuration conf, String key)
throws IOException{
ZKClusterKey zkClusterKey = transformClusterKey(key);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.quorumString);
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.clientPort);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.znodeParent);
}
/**
* Separate the given key into the three configurations it should contain:
* hbase.zookeeper.quorum, hbase.zookeeper.client.port
* and zookeeper.znode.parent
* @param key
* @return the three configuration in the described order
* @throws IOException
*/
public static ZKClusterKey transformClusterKey(String key) throws IOException {
String[] parts = key.split(":");
if (parts.length == 3) {
return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
}
if (parts.length > 3) {
// The quorum could contain client port in server:clientport format, try to transform more.
String zNodeParent = parts [parts.length - 1];
String clientPort = parts [parts.length - 2];
// The first part length is the total length minus the lengths of other parts and minus 2 ":"
int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
String quorumStringInput = key.substring(0, endQuorumIndex);
String[] serverHosts = quorumStringInput.split(",");
// The common case is that every server has its own client port specified - this means
// that (total parts - the ZNodeParent part - the ClientPort part) is equal to
// (the number of "," + 1) - "+ 1" because the last server has no ",".
if ((parts.length - 2) == (serverHosts.length + 1)) {
return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
}
// For the uncommon case that some servers has no port specified, we need to build the
// server:clientport list using default client port for servers without specified port.
return new ZKClusterKey(
ZKConfig.buildQuorumServerString(serverHosts, clientPort),
Integer.parseInt(clientPort),
zNodeParent);
}
throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
+ HConstants.ZOOKEEPER_ZNODE_PARENT);
}
/**
* Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
* @param quorumStringInput a string contains a list of servers for ZK quorum
* @param clientPort the default client port
* @return the string for a list of "server:port" separated by ","
*/
@VisibleForTesting
public static String standardizeQuorumServerString(String quorumStringInput, String clientPort) {
String[] serverHosts = quorumStringInput.split(",");
return ZKConfig.buildQuorumServerString(serverHosts, clientPort);
}
//
// Existence checks and watches
//

View File

@ -40,17 +40,6 @@ import org.junit.experimental.categories.Category;
@Category({SmallTests.class})
public class TestZKUtil {
@Test
public void testGetZooKeeperClusterKey() {
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
String clusterKey = ZKUtil.getZooKeeperClusterKey(conf, "test");
Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
Assert.assertEquals("localhost:3333:hbase,test", clusterKey);
}
@Test
public void testCreateACL() throws ZooKeeperConnectionException, IOException {
Configuration conf = HBaseConfiguration.create();

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map.Entry;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
/**
* Adds HBase configuration files to a Configuration
@ -115,7 +116,7 @@ public class HBaseConfiguration extends Configuration {
* @param srcConf the source configuration
**/
public static void merge(Configuration destConf, Configuration srcConf) {
for (Entry<String, String> e : srcConf) {
for (Map.Entry<String, String> e : srcConf) {
destConf.set(e.getKey(), e.getValue());
}
}
@ -129,7 +130,7 @@ public class HBaseConfiguration extends Configuration {
*/
public static Configuration subset(Configuration srcConf, String prefix) {
Configuration newConf = new Configuration(false);
for (Entry<String, String> entry : srcConf) {
for (Map.Entry<String, String> entry : srcConf) {
if (entry.getKey().startsWith(prefix)) {
String newKey = entry.getKey().substring(prefix.length());
// avoid entries that would produce an empty key
@ -141,6 +142,18 @@ public class HBaseConfiguration extends Configuration {
return newConf;
}
/**
* Sets all the entries in the provided {@code Map<String, String>} as properties in the
* given {@code Configuration}. Each property will have the specified prefix prepended,
* so that the configuration entries are keyed by {@code prefix + entry.getKey()}.
*/
public static void setWithPrefix(Configuration conf, String prefix,
Iterable<Map.Entry<String, String>> properties) {
for (Map.Entry<String, String> entry : properties) {
conf.set(prefix + entry.getKey(), entry.getValue());
}
}
/**
* @return whether to show HBase Configuration in servlet
*/
@ -235,6 +248,65 @@ public class HBaseConfiguration extends Configuration {
return passwd;
}
/**
* Generates a {@link Configuration} instance by applying the ZooKeeper cluster key
* to the base Configuration. Note that additional configuration properties may be needed
* for a remote cluster, so it is preferable to use
* {@link #createClusterConf(Configuration, String, String)}.
*
* @param baseConf the base configuration to use, containing prefixed override properties
* @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
*
* @return the merged configuration with override properties and cluster key applied
*
* @see #createClusterConf(Configuration, String, String)
*/
public static Configuration createClusterConf(Configuration baseConf, String clusterKey)
throws IOException {
return createClusterConf(baseConf, clusterKey, null);
}
/**
* Generates a {@link Configuration} instance by applying property overrides prefixed by
* a cluster profile key to the base Configuration. Override properties are extracted by
* the {@link #subset(Configuration, String)} method, then the merged on top of the base
* Configuration and returned.
*
* @param baseConf the base configuration to use, containing prefixed override properties
* @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
* @param overridePrefix the property key prefix to match for override properties,
* or {@code null} if none
* @return the merged configuration with override properties and cluster key applied
*/
public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
String overridePrefix) throws IOException {
Configuration clusterConf = HBaseConfiguration.create(baseConf);
if (clusterKey != null && !clusterKey.isEmpty()) {
applyClusterKeyToConf(clusterConf, clusterKey);
}
if (overridePrefix != null && !overridePrefix.isEmpty()) {
Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix);
HBaseConfiguration.merge(clusterConf, clusterSubset);
}
return clusterConf;
}
/**
* Apply the settings in the given key to the given configuration, this is
* used to communicate with distant clusters
* @param conf configuration object to configure
* @param key string that contains the 3 required configuratins
* @throws IOException
*/
private static void applyClusterKeyToConf(Configuration conf, String key)
throws IOException{
ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
}
/**
* For debugging. Dump configurations to system output as xml format.
* Master and RS configurations can also be dumped using

View File

@ -0,0 +1,301 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Properties;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Utility methods for reading, and building the ZooKeeper configuration.
*
* The order and priority for reading the config are as follows:
* (1). Property with "hbase.zookeeper.property." prefix from HBase XML
* (2). other zookeeper related properties in HBASE XML
*/
@InterfaceAudience.Private
public final class ZKConfig {
private static final String VARIABLE_START = "${";
private ZKConfig() {
}
/**
* Make a Properties object holding ZooKeeper config.
* Parses the corresponding config options from the HBase XML configs
* and generates the appropriate ZooKeeper properties.
* @param conf Configuration to read from.
* @return Properties holding mappings representing ZooKeeper config file.
*/
public static Properties makeZKProps(Configuration conf) {
return makeZKPropsFromHbaseConfig(conf);
}
/**
* Make a Properties object holding ZooKeeper config.
* Parses the corresponding config options from the HBase XML configs
* and generates the appropriate ZooKeeper properties.
*
* @param conf Configuration to read from.
* @return Properties holding mappings representing ZooKeeper config file.
*/
private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
Properties zkProperties = new Properties();
// Directly map all of the hbase.zookeeper.property.KEY properties.
// Synchronize on conf so no loading of configs while we iterate
synchronized (conf) {
for (Entry<String, String> entry : conf) {
String key = entry.getKey();
if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
String value = entry.getValue();
// If the value has variables substitutions, need to do a get.
if (value.contains(VARIABLE_START)) {
value = conf.get(key);
}
zkProperties.setProperty(zkKey, value);
}
}
}
// If clientPort is not set, assign the default.
if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
zkProperties.put(HConstants.CLIENT_PORT_STR,
HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
}
// Create the server.X properties.
int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
HConstants.LOCALHOST);
String serverHost;
String address;
String key;
for (int i = 0; i < serverHosts.length; ++i) {
if (serverHosts[i].contains(":")) {
serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
} else {
serverHost = serverHosts[i];
}
address = serverHost + ":" + peerPort + ":" + leaderPort;
key = "server." + i;
zkProperties.put(key, address);
}
return zkProperties;
}
/**
* Return the ZK Quorum servers string given the specified configuration
*
* @param conf
* @return Quorum servers String
*/
private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
String defaultClientPort = Integer.toString(
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
// Build the ZK quorum server string with "server:clientport" list, separated by ','
final String[] serverHosts =
conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
return buildZKQuorumServerString(serverHosts, defaultClientPort);
}
/**
* Return the ZK Quorum servers string given the specified configuration.
* @return Quorum servers
*/
public static String getZKQuorumServersString(Configuration conf) {
return getZKQuorumServersStringFromHbaseConfig(conf);
}
/**
* Build the ZK quorum server string with "server:clientport" list, separated by ','
*
* @param serverHosts a list of servers for ZK quorum
* @param clientPort the default client port
* @return the string for a list of "server:port" separated by ","
*/
public static String buildZKQuorumServerString(String[] serverHosts, String clientPort) {
StringBuilder quorumStringBuilder = new StringBuilder();
String serverHost;
for (int i = 0; i < serverHosts.length; ++i) {
if (serverHosts[i].contains(":")) {
serverHost = serverHosts[i]; // just use the port specified from the input
} else {
serverHost = serverHosts[i] + ":" + clientPort;
}
if (i > 0) {
quorumStringBuilder.append(',');
}
quorumStringBuilder.append(serverHost);
}
return quorumStringBuilder.toString();
}
/**
* Verifies that the given key matches the expected format for a ZooKeeper cluster key.
* The Quorum for the ZK cluster can have one the following formats (see examples below):
*
* <ol>
* <li>s1,s2,s3 (no client port in the list, the client port could be obtained from
* clientPort)</li>
* <li>s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
* in this case, the clientPort would be ignored)</li>
* <li>s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
* the clientPort; otherwise, it would use the specified port)</li>
* </ol>
*
* @param key the cluster key to validate
* @throws IOException if the key could not be parsed
*/
public static void validateClusterKey(String key) throws IOException {
transformClusterKey(key);
}
/**
* Separate the given key into the three configurations it should contain:
* hbase.zookeeper.quorum, hbase.zookeeper.client.port
* and zookeeper.znode.parent
* @param key
* @return the three configuration in the described order
* @throws IOException
*/
public static ZKClusterKey transformClusterKey(String key) throws IOException {
String[] parts = key.split(":");
if (parts.length == 3) {
return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
}
if (parts.length > 3) {
// The quorum could contain client port in server:clientport format, try to transform more.
String zNodeParent = parts [parts.length - 1];
String clientPort = parts [parts.length - 2];
// The first part length is the total length minus the lengths of other parts and minus 2 ":"
int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
String quorumStringInput = key.substring(0, endQuorumIndex);
String[] serverHosts = quorumStringInput.split(",");
// The common case is that every server has its own client port specified - this means
// that (total parts - the ZNodeParent part - the ClientPort part) is equal to
// (the number of "," + 1) - "+ 1" because the last server has no ",".
if ((parts.length - 2) == (serverHosts.length + 1)) {
return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
}
// For the uncommon case that some servers has no port specified, we need to build the
// server:clientport list using default client port for servers without specified port.
return new ZKClusterKey(
buildZKQuorumServerString(serverHosts, clientPort),
Integer.parseInt(clientPort),
zNodeParent);
}
throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
+ HConstants.ZOOKEEPER_ZNODE_PARENT);
}
/**
* Get the key to the ZK ensemble for this configuration without
* adding a name at the end
* @param conf Configuration to use to build the key
* @return ensemble key without a name
*/
public static String getZooKeeperClusterKey(Configuration conf) {
return getZooKeeperClusterKey(conf, null);
}
/**
* Get the key to the ZK ensemble for this configuration and append
* a name at the end
* @param conf Configuration to use to build the key
* @param name Name that should be appended at the end if not empty or null
* @return ensemble key with a name (if any)
*/
public static String getZooKeeperClusterKey(Configuration conf, String name) {
String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
"[\\t\\n\\x0B\\f\\r]", "");
StringBuilder builder = new StringBuilder(ensemble);
builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (name != null && !name.isEmpty()) {
builder.append(",");
builder.append(name);
}
return builder.toString();
}
/**
* Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
* @param quorumStringInput a string contains a list of servers for ZK quorum
* @param clientPort the default client port
* @return the string for a list of "server:port" separated by ","
*/
@VisibleForTesting
public static String standardizeZKQuorumServerString(String quorumStringInput,
String clientPort) {
String[] serverHosts = quorumStringInput.split(",");
return buildZKQuorumServerString(serverHosts, clientPort);
}
// The Quorum for the ZK cluster can have one the following format (see examples below):
// (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
// (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
// in this case, the clientPort would be ignored)
// (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
// the clientPort; otherwise, it would use the specified port)
@VisibleForTesting
public static class ZKClusterKey {
private String quorumString;
private int clientPort;
private String znodeParent;
ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
this.quorumString = quorumString;
this.clientPort = clientPort;
this.znodeParent = znodeParent;
}
public String getQuorumString() {
return quorumString;
}
public int getClientPort() {
return clientPort;
}
public String getZnodeParent() {
return znodeParent;
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
@ -27,11 +28,13 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -73,8 +76,11 @@ public class TestHBaseConfiguration {
String prefix = "hbase.mapred.output.";
conf.set("hbase.security.authentication", "kerberos");
conf.set("hbase.regionserver.kerberos.principal", "hbasesource");
conf.set(prefix + "hbase.regionserver.kerberos.principal", "hbasedest");
conf.set(prefix, "shouldbemissing");
HBaseConfiguration.setWithPrefix(conf, prefix,
ImmutableMap.of(
"hbase.regionserver.kerberos.principal", "hbasedest",
"", "shouldbemissing")
.entrySet());
Configuration subsetConf = HBaseConfiguration.subset(conf, prefix);
assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal"));

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category({MiscTests.class, SmallTests.class})
public class TestZKConfig {
@Test
public void testZKConfigLoading() throws Exception {
Configuration conf = HBaseConfiguration.create();
// Test that we read only from the config instance
// (i.e. via hbase-default.xml and hbase-site.xml)
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
Properties props = ZKConfig.makeZKProps(conf);
assertEquals("Property client port should have been default from the HBase config",
"2181",
props.getProperty("clientPort"));
}
@Test
public void testGetZooKeeperClusterKey() {
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
String clusterKey = ZKConfig.getZooKeeperClusterKey(conf, "test");
assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
assertEquals("localhost:3333:hbase,test", clusterKey);
}
@Test
public void testClusterKey() throws Exception {
testKey("server", 2181, "hbase");
testKey("server1,server2,server3", 2181, "hbase");
try {
ZKConfig.validateClusterKey("2181:hbase");
} catch (IOException ex) {
// OK
}
}
@Test
public void testClusterKeyWithMultiplePorts() throws Exception {
// server has different port than the default port
testKey("server1:2182", 2181, "hbase", true);
// multiple servers have their own port
testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
// one server has no specified port, should use default port
testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
// the last server has no specified port, should use default port
testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
// multiple servers have no specified port, should use default port for those servers
testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
// same server, different ports
testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
// mix of same server/different port and different server
testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
}
private void testKey(String ensemble, int port, String znode)
throws IOException {
testKey(ensemble, port, znode, false); // not support multiple client ports
}
private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
throws IOException {
Configuration conf = new Configuration();
String key = ensemble+":"+port+":"+znode;
String ensemble2 = null;
ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
if (multiplePortSupport) {
ensemble2 = ZKConfig.standardizeZKQuorumServerString(ensemble,
Integer.toString(port));
assertEquals(ensemble2, zkClusterKey.getQuorumString());
}
else {
assertEquals(ensemble, zkClusterKey.getQuorumString());
}
assertEquals(port, zkClusterKey.getClientPort());
assertEquals(znode, zkClusterKey.getZnodeParent());
conf = HBaseConfiguration.createClusterConf(conf, key);
assertEquals(zkClusterKey.getQuorumString(), conf.get(HConstants.ZOOKEEPER_QUORUM));
assertEquals(zkClusterKey.getClientPort(), conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
assertEquals(zkClusterKey.getZnodeParent(), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
String reconstructedKey = ZKConfig.getZooKeeperClusterKey(conf);
if (multiplePortSupport) {
String key2 = ensemble2 + ":" + port + ":" + znode;
assertEquals(key2, reconstructedKey);
}
else {
assertEquals(key, reconstructedKey);
}
}
}

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@ -174,8 +173,9 @@ public class SyncTable extends Configured implements Tool {
Configuration conf = context.getConfiguration();
sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY);
targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY);
sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
TableOutputFormat.OUTPUT_CONF_PREFIX);
sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
@ -196,13 +196,12 @@ public class SyncTable extends Configured implements Tool {
targetHasher = new HashTable.ResultHasher();
}
private static Connection openConnection(Configuration conf, String zkClusterConfKey)
private static Connection openConnection(Configuration conf, String zkClusterConfKey,
String configPrefix)
throws IOException {
Configuration clusterConf = new Configuration(conf);
String zkCluster = conf.get(zkClusterConfKey);
if (zkCluster != null) {
ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster);
}
Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
zkCluster, configPrefix);
return ConnectionFactory.createConnection(clusterConf);
}

View File

@ -43,12 +43,11 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.File;
import java.io.IOException;
@ -485,12 +484,8 @@ public class TableMapReduceUtil {
String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
User user = userProvider.getCurrent();
if (quorumAddress != null) {
Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
// apply any "hbase.mapred.output." configuration overrides
Configuration outputOverrides =
HBaseConfiguration.subset(peerConf, TableOutputFormat.OUTPUT_CONF_PREFIX);
HBaseConfiguration.merge(peerConf, outputOverrides);
Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
Connection peerConn = ConnectionFactory.createConnection(peerConf);
try {
TokenUtil.addTokenForJob(peerConn, user, job);
@ -523,15 +518,30 @@ public class TableMapReduceUtil {
* @param job The job that requires the permission.
* @param quorumAddress string that contains the 3 required configuratins
* @throws IOException When the authentication token cannot be obtained.
* @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
*/
@Deprecated
public static void initCredentialsForCluster(Job job, String quorumAddress)
throws IOException {
Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
quorumAddress);
initCredentialsForCluster(job, peerConf);
}
/**
* Obtain an authentication token, for the specified cluster, on behalf of the current user
* and add it to the credentials for the given map reduce job.
*
* @param job The job that requires the permission.
* @param conf The configuration to use in connecting to the peer cluster
* @throws IOException When the authentication token cannot be obtained.
*/
public static void initCredentialsForCluster(Job job, Configuration conf)
throws IOException {
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
if (userProvider.isHBaseSecurityEnabled()) {
try {
Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
Connection peerConn = ConnectionFactory.createConnection(peerConf);
Connection peerConn = ConnectionFactory.createConnection(conf);
try {
TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
} finally {
@ -680,7 +690,7 @@ public class TableMapReduceUtil {
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
// Calling this will validate the format
ZKUtil.transformClusterKey(quorumAddress);
ZKConfig.validateClusterKey(quorumAddress);
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
if (serverClass != null && serverImpl != null) {

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@ -195,22 +194,19 @@ implements Configurable {
@Override
public void setConf(Configuration otherConf) {
this.conf = HBaseConfiguration.create(otherConf);
String tableName = this.conf.get(OUTPUT_TABLE);
String tableName = otherConf.get(OUTPUT_TABLE);
if(tableName == null || tableName.length() <= 0) {
throw new IllegalArgumentException("Must specify table name");
}
String address = this.conf.get(QUORUM_ADDRESS);
int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
String serverClass = this.conf.get(REGION_SERVER_CLASS);
String serverImpl = this.conf.get(REGION_SERVER_IMPL);
String address = otherConf.get(QUORUM_ADDRESS);
int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
String serverClass = otherConf.get(REGION_SERVER_CLASS);
String serverImpl = otherConf.get(REGION_SERVER_IMPL);
try {
if (address != null) {
ZKUtil.applyClusterKeyToConf(this.conf, address);
}
this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
if (serverClass != null) {
this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
}
@ -221,9 +217,5 @@ implements Configurable {
LOG.error(e);
throw new RuntimeException(e);
}
// finally apply any remaining "hbase.mapred.output." configuration overrides
Configuration outputOverrides = HBaseConfiguration.subset(otherConf, OUTPUT_CONF_PREFIX);
HBaseConfiguration.merge(this.conf, outputOverrides);
}
}

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
@ -70,6 +69,7 @@ public class VerifyReplication extends Configured implements Tool {
LogFactory.getLog(VerifyReplication.class);
public final static String NAME = "verifyrep";
private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
static long startTime = 0;
static long endTime = Long.MAX_VALUE;
static int versions = -1;
@ -130,8 +130,8 @@ public class VerifyReplication extends Configured implements Tool {
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
Configuration peerConf = HBaseConfiguration.create(conf);
ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
zkClusterKey, PEER_CONFIG_PREFIX);
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
connection = ConnectionFactory.createConnection(peerConf);
@ -211,7 +211,8 @@ public class VerifyReplication extends Configured implements Tool {
}
}
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
final Configuration conf) throws IOException {
ZooKeeperWatcher localZKW = null;
ReplicationPeerZKImpl peer = null;
try {
@ -228,8 +229,8 @@ public class VerifyReplication extends Configured implements Tool {
if (pair == null) {
throw new IOException("Couldn't get peer conf!");
}
Configuration peerConf = rp.getPeerConf(peerId).getSecond();
return ZKUtil.getZooKeeperClusterKey(peerConf);
return pair;
} catch (ReplicationException e) {
throw new IOException(
"An error occured while trying to connect to the remove peer cluster", e);
@ -268,9 +269,14 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME+".families", families);
}
String peerQuorumAddress = getPeerQuorumAddress(conf);
Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
String peerQuorumAddress = peerConfig.getClusterKey();
LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
peerConfig.getConfiguration());
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
LOG.info("Peer Quorum Address: " + peerQuorumAddress);
HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
peerConfig.getConfiguration().entrySet());
conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions);
@ -293,8 +299,9 @@ public class VerifyReplication extends Configured implements Tool {
TableMapReduceUtil.initTableMapperJob(tableName, scan,
Verifier.class, null, null, job);
Configuration peerClusterConf = peerConfigPair.getSecond();
// Obtain the auth token from peer cluster
TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
/**
* Similar to {@link RegionReplicaUtil} but for the server side
@ -148,7 +148,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
try {
if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) {
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf));
peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
}

View File

@ -343,71 +343,6 @@ public class TestZooKeeper {
assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null));
}
@Test
public void testClusterKey() throws Exception {
testKey("server", 2181, "hbase");
testKey("server1,server2,server3", 2181, "hbase");
try {
ZKUtil.transformClusterKey("2181:hbase");
} catch (IOException ex) {
// OK
}
}
@Test
public void testClusterKeyWithMultiplePorts() throws Exception {
// server has different port than the default port
testKey("server1:2182", 2181, "hbase", true);
// multiple servers have their own port
testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
// one server has no specified port, should use default port
testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
// the last server has no specified port, should use default port
testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
// multiple servers have no specified port, should use default port for those servers
testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
// same server, different ports
testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
// mix of same server/different port and different server
testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
}
private void testKey(String ensemble, int port, String znode)
throws IOException {
testKey(ensemble, port, znode, false); // not support multiple client ports
}
private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
throws IOException {
Configuration conf = new Configuration();
String key = ensemble+":"+port+":"+znode;
String ensemble2 = null;
ZKUtil.ZKClusterKey zkClusterKey = ZKUtil.transformClusterKey(key);
if (multiplePortSupport) {
ensemble2 = ZKUtil.standardizeQuorumServerString(ensemble, Integer.toString(port));
assertEquals(ensemble2, zkClusterKey.quorumString);
}
else {
assertEquals(ensemble, zkClusterKey.quorumString);
}
assertEquals(port, zkClusterKey.clientPort);
assertEquals(znode, zkClusterKey.znodeParent);
ZKUtil.applyClusterKeyToConf(conf, key);
assertEquals(zkClusterKey.quorumString, conf.get(HConstants.ZOOKEEPER_QUORUM));
assertEquals(zkClusterKey.clientPort, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
assertEquals(zkClusterKey.znodeParent, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf);
if (multiplePortSupport) {
String key2 = ensemble2 + ":" + port + ":" + znode;
assertEquals(key2, reconstructedKey);
}
else {
assertEquals(key, reconstructedKey);
}
}
/**
* A test for HBASE-3238
* @throws IOException A connection attempt to zk failed

View File

@ -24,9 +24,13 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -38,10 +42,12 @@ import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Unit testing of ReplicationAdmin
@ -117,7 +123,29 @@ public class TestReplicationAdmin {
admin.removePeer(ID_SECOND);
assertEquals(0, admin.getPeersCount());
}
/**
* Tests that the peer configuration used by ReplicationAdmin contains all
* the peer's properties.
*/
@Test
public void testPeerConfig() throws Exception {
ReplicationPeerConfig config = new ReplicationPeerConfig();
config.setClusterKey(KEY_ONE);
config.getConfiguration().put("key1", "value1");
config.getConfiguration().put("key2", "value2");
admin.addPeer(ID_ONE, config, null);
List<ReplicationPeer> peers = admin.listValidReplicationPeers();
assertEquals(1, peers.size());
ReplicationPeer peerOne = peers.get(0);
assertNotNull(peerOne);
assertEquals("value1", peerOne.getConfiguration().get("key1"));
assertEquals("value2", peerOne.getConfiguration().get("key2"));
admin.removePeer(ID_ONE);
}
@Test
public void testAddPeerWithUnDeletedQueues() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@ -115,7 +115,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
@ -157,7 +157,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
int peerCount = admin.getPeersCount();
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
admin.addPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
// This test is flakey and then there is so much stuff flying around in here its, hard to
// debug. Peer needs to be up for the edit to make it across. This wait on
@ -209,7 +209,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
admin.addPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf2))
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
null);
@ -234,7 +234,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@Test (timeout=120000)
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
// now replicate some data.
try (Connection connection = ConnectionFactory.createConnection(conf1)) {

View File

@ -27,7 +27,7 @@ import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.zookeeper.KeeperException;
import org.junit.Before;
import org.junit.Test;
@ -202,7 +202,7 @@ public abstract class TestReplicationStateBasic {
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
rp.removePeer(ID_ONE);
rp.peerRemoved(ID_ONE);
assertNumberOfPeers(1);

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -79,7 +80,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
ZKUtil.createWithParents(zkw1, fakeRs);
ZKClusterId.setClusterId(zkw1, new ClusterId());
return ZKUtil.getZooKeeperClusterKey(testConf);
return ZKConfig.getZooKeeperClusterKey(testConf);
}
@Before
@ -94,7 +95,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3);
rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
}

View File

@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -129,7 +129,8 @@ public class TestRegionReplicaReplicationEndpoint {
// assert peer configuration is correct
peerConfig = admin.getPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
HTU.getConfiguration()));
assertEquals(peerConfig.getReplicationEndpointImpl(),
RegionReplicaReplicationEndpoint.class.getName());
admin.close();
@ -162,7 +163,8 @@ public class TestRegionReplicaReplicationEndpoint {
// assert peer configuration is correct
peerConfig = admin.getPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
HTU.getConfiguration()));
assertEquals(peerConfig.getReplicationEndpointImpl(),
RegionReplicaReplicationEndpoint.class.getName());
admin.close();

View File

@ -1,45 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MiscTests.class, SmallTests.class})
public class TestZKConfig {
@Test
public void testZKConfigLoading() throws Exception {
Configuration conf = HBaseConfiguration.create();
// Test that we read only from the config instance
// (i.e. via hbase-default.xml and hbase-site.xml)
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
Properties props = ZKConfig.makeZKProps(conf);
Assert.assertEquals("Property client port should have been default from the HBase config",
"2181",
props.getProperty("clientPort"));
}
}