HBASE-20159 Support using separate ZK quorums for client
This commit is contained in:
parent
78452113cb
commit
b72e19e3b9
|
@ -124,7 +124,13 @@ public final class ReadOnlyZKClient implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReadOnlyZKClient(Configuration conf) {
|
public ReadOnlyZKClient(Configuration conf) {
|
||||||
|
// We might use a different ZK for client access
|
||||||
|
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
|
||||||
|
if (clientZkQuorumServers != null) {
|
||||||
|
this.connectString = clientZkQuorumServers;
|
||||||
|
} else {
|
||||||
this.connectString = ZKConfig.getZKQuorumServersString(conf);
|
this.connectString = ZKConfig.getZKQuorumServersString(conf);
|
||||||
|
}
|
||||||
this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
|
this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
|
||||||
this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
|
this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
|
||||||
this.retryIntervalMs =
|
this.retryIntervalMs =
|
||||||
|
|
|
@ -187,6 +187,19 @@ public final class HConstants {
|
||||||
/** Name of ZooKeeper quorum configuration parameter. */
|
/** Name of ZooKeeper quorum configuration parameter. */
|
||||||
public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
|
public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
|
||||||
|
|
||||||
|
/** Name of ZooKeeper quorum configuration parameter for client to locate meta. */
|
||||||
|
public static final String CLIENT_ZOOKEEPER_QUORUM = "hbase.client.zookeeper.quorum";
|
||||||
|
|
||||||
|
/** Client port of ZooKeeper for client to locate meta */
|
||||||
|
public static final String CLIENT_ZOOKEEPER_CLIENT_PORT =
|
||||||
|
"hbase.client.zookeeper.property.clientPort";
|
||||||
|
|
||||||
|
/** Indicate whether the client ZK are observer nodes of the server ZK */
|
||||||
|
public static final String CLIENT_ZOOKEEPER_OBSERVER_MODE =
|
||||||
|
"hbase.client.zookeeper.observer.mode";
|
||||||
|
/** Assuming client zk not in observer mode and master need to synchronize information */
|
||||||
|
public static final boolean DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE = false;
|
||||||
|
|
||||||
/** Common prefix of ZooKeeper configuration properties */
|
/** Common prefix of ZooKeeper configuration properties */
|
||||||
public static final String ZK_CFG_PROPERTY_PREFIX =
|
public static final String ZK_CFG_PROPERTY_PREFIX =
|
||||||
"hbase.zookeeper.property.";
|
"hbase.zookeeper.property.";
|
||||||
|
@ -205,7 +218,7 @@ public final class HConstants {
|
||||||
ZK_CFG_PROPERTY_PREFIX + CLIENT_PORT_STR;
|
ZK_CFG_PROPERTY_PREFIX + CLIENT_PORT_STR;
|
||||||
|
|
||||||
/** Default client port that the zookeeper listens on */
|
/** Default client port that the zookeeper listens on */
|
||||||
public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181;
|
public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parameter name for the wait time for the recoverable zookeeper
|
* Parameter name for the wait time for the recoverable zookeeper
|
||||||
|
@ -239,7 +252,7 @@ public final class HConstants {
|
||||||
ZK_CFG_PROPERTY_PREFIX + "tickTime";
|
ZK_CFG_PROPERTY_PREFIX + "tickTime";
|
||||||
|
|
||||||
/** Default limit on concurrent client-side zookeeper connections */
|
/** Default limit on concurrent client-side zookeeper connections */
|
||||||
public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 300;
|
public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 300;
|
||||||
|
|
||||||
/** Configuration key for ZooKeeper session timeout */
|
/** Configuration key for ZooKeeper session timeout */
|
||||||
public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";
|
public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Properties;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -85,7 +86,7 @@ public final class ZKConfig {
|
||||||
// If clientPort is not set, assign the default.
|
// If clientPort is not set, assign the default.
|
||||||
if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
|
if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
|
||||||
zkProperties.put(HConstants.CLIENT_PORT_STR,
|
zkProperties.put(HConstants.CLIENT_PORT_STR,
|
||||||
HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
|
HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the server.X properties.
|
// Create the server.X properties.
|
||||||
|
@ -119,7 +120,7 @@ public final class ZKConfig {
|
||||||
*/
|
*/
|
||||||
private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
|
private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
|
||||||
String defaultClientPort = Integer.toString(
|
String defaultClientPort = Integer.toString(
|
||||||
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
|
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT));
|
||||||
|
|
||||||
// Build the ZK quorum server string with "server:clientport" list, separated by ','
|
// Build the ZK quorum server string with "server:clientport" list, separated by ','
|
||||||
final String[] serverHosts =
|
final String[] serverHosts =
|
||||||
|
@ -310,4 +311,23 @@ public final class ZKConfig {
|
||||||
return znodeParent;
|
return znodeParent;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the client ZK Quorum servers string
|
||||||
|
* @param conf the configuration to read
|
||||||
|
* @return Client quorum servers, or null if not specified
|
||||||
|
*/
|
||||||
|
public static String getClientZKQuorumServersString(Configuration conf) {
|
||||||
|
String clientQuromServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
|
||||||
|
if (clientQuromServers == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
int defaultClientPort =
|
||||||
|
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
|
||||||
|
String clientZkClientPort =
|
||||||
|
Integer.toString(conf.getInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, defaultClientPort));
|
||||||
|
// Build the ZK quorum server string with "server:clientport" list, separated by ','
|
||||||
|
final String[] serverHosts = StringUtils.getStrings(clientQuromServers);
|
||||||
|
return buildZKQuorumServerString(serverHosts, clientZkClientPort);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,6 +136,8 @@ import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
|
||||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||||
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
|
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
|
||||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||||
|
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
|
||||||
|
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
|
||||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||||
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
|
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
|
@ -298,6 +300,10 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
private DrainingServerTracker drainingServerTracker;
|
private DrainingServerTracker drainingServerTracker;
|
||||||
// Tracker for load balancer state
|
// Tracker for load balancer state
|
||||||
LoadBalancerTracker loadBalancerTracker;
|
LoadBalancerTracker loadBalancerTracker;
|
||||||
|
// Tracker for meta location, if any client ZK quorum specified
|
||||||
|
MetaLocationSyncer metaLocationSyncer;
|
||||||
|
// Tracker for active master location, if any client ZK quorum specified
|
||||||
|
MasterAddressSyncer masterAddressSyncer;
|
||||||
|
|
||||||
// Tracker for split and merge state
|
// Tracker for split and merge state
|
||||||
private SplitOrMergeTracker splitOrMergeTracker;
|
private SplitOrMergeTracker splitOrMergeTracker;
|
||||||
|
@ -553,6 +559,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
|
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
|
||||||
|
Threads.setDaemonThreadRunning(new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
int infoPort = putUpJettyServer();
|
int infoPort = putUpJettyServer();
|
||||||
startActiveMasterManager(infoPort);
|
startActiveMasterManager(infoPort);
|
||||||
|
@ -565,6 +572,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
abort(error, t);
|
abort(error, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
// Fall in here even if we have been aborted. Need to run the shutdown services and
|
// Fall in here even if we have been aborted. Need to run the shutdown services and
|
||||||
// the super run call will do this for us.
|
// the super run call will do this for us.
|
||||||
|
@ -746,6 +754,23 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper);
|
this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper);
|
||||||
this.maintenanceModeTracker.start();
|
this.maintenanceModeTracker.start();
|
||||||
|
|
||||||
|
String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
|
||||||
|
boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
|
||||||
|
HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
|
||||||
|
if (clientQuorumServers != null && !clientZkObserverMode) {
|
||||||
|
// we need to take care of the ZK information synchronization
|
||||||
|
// if given client ZK are not observer nodes
|
||||||
|
ZKWatcher clientZkWatcher = new ZKWatcher(conf,
|
||||||
|
getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this,
|
||||||
|
false, true);
|
||||||
|
this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this);
|
||||||
|
this.metaLocationSyncer.start();
|
||||||
|
this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this);
|
||||||
|
this.masterAddressSyncer.start();
|
||||||
|
// set cluster id is a one-go effort
|
||||||
|
ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId());
|
||||||
|
}
|
||||||
|
|
||||||
// Set the cluster as up. If new RSs, they'll be waiting on this before
|
// Set the cluster as up. If new RSs, they'll be waiting on this before
|
||||||
// going ahead with their startup.
|
// going ahead with their startup.
|
||||||
boolean wasUp = this.clusterStatusTracker.isClusterUp();
|
boolean wasUp = this.clusterStatusTracker.isClusterUp();
|
||||||
|
|
|
@ -0,0 +1,241 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.zksyncer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if
|
||||||
|
* changed
|
||||||
|
* <p/>
|
||||||
|
* The target znode(s) is given through {@link #getNodesToWatch()} method
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class ClientZKSyncer extends ZKListener {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ClientZKSyncer.class);
|
||||||
|
private final Server server;
|
||||||
|
private final ZKWatcher clientZkWatcher;
|
||||||
|
// We use queues and daemon threads to synchronize the data to client ZK cluster
|
||||||
|
// to avoid blocking the single event thread for watchers
|
||||||
|
private final Map<String, BlockingQueue<byte[]>> queues;
|
||||||
|
|
||||||
|
public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
|
||||||
|
super(watcher);
|
||||||
|
this.server = server;
|
||||||
|
this.clientZkWatcher = clientZkWatcher;
|
||||||
|
this.queues = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts the syncer
|
||||||
|
* @throws KeeperException if error occurs when trying to create base nodes on client ZK
|
||||||
|
*/
|
||||||
|
public void start() throws KeeperException {
|
||||||
|
LOG.debug("Starting " + getClass().getSimpleName());
|
||||||
|
this.watcher.registerListener(this);
|
||||||
|
// create base znode on remote ZK
|
||||||
|
ZKUtil.createWithParents(clientZkWatcher, watcher.znodePaths.baseZNode);
|
||||||
|
// set meta znodes for client ZK
|
||||||
|
Collection<String> nodes = getNodesToWatch();
|
||||||
|
LOG.debug("Znodes to watch: " + nodes);
|
||||||
|
// initialize queues and threads
|
||||||
|
for (String node : nodes) {
|
||||||
|
BlockingQueue<byte[]> queue = new ArrayBlockingQueue<>(1);
|
||||||
|
queues.put(node, queue);
|
||||||
|
Thread updater = new ClientZkUpdater(node, queue);
|
||||||
|
updater.setDaemon(true);
|
||||||
|
updater.start();
|
||||||
|
watchAndCheckExists(node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void watchAndCheckExists(String node) {
|
||||||
|
try {
|
||||||
|
if (ZKUtil.watchAndCheckExists(watcher, node)) {
|
||||||
|
byte[] data = ZKUtil.getDataAndWatch(watcher, node);
|
||||||
|
if (data != null) {
|
||||||
|
// put the data into queue
|
||||||
|
upsertQueue(node, data);
|
||||||
|
} else {
|
||||||
|
// It existed but now does not, should has been tracked by our watcher, ignore
|
||||||
|
LOG.debug("Found no data from " + node);
|
||||||
|
watchAndCheckExists(node);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// cleanup stale ZNodes on client ZK to avoid invalid requests to server
|
||||||
|
ZKUtil.deleteNodeFailSilent(clientZkWatcher, node);
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
server.abort("Unexpected exception during initialization, aborting", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the value of the single element in queue if any, or else insert.
|
||||||
|
* <p/>
|
||||||
|
* We only need to synchronize the latest znode value to client ZK rather than synchronize each
|
||||||
|
* time
|
||||||
|
* @param data the data to write to queue
|
||||||
|
*/
|
||||||
|
private void upsertQueue(String node, byte[] data) {
|
||||||
|
BlockingQueue<byte[]> queue = queues.get(node);
|
||||||
|
synchronized (queue) {
|
||||||
|
queue.poll();
|
||||||
|
queue.offer(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when
|
||||||
|
* modifying this method
|
||||||
|
* @param node the znode to set on client ZK
|
||||||
|
* @param data the data to set to client ZK
|
||||||
|
* @throws InterruptedException if the thread is interrupted during process
|
||||||
|
*/
|
||||||
|
private final void setDataForClientZkUntilSuccess(String node, byte[] data)
|
||||||
|
throws InterruptedException {
|
||||||
|
while (!server.isStopped()) {
|
||||||
|
try {
|
||||||
|
LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher);
|
||||||
|
ZKUtil.setData(clientZkWatcher, node, data);
|
||||||
|
break;
|
||||||
|
} catch (KeeperException.NoNodeException nne) {
|
||||||
|
// Node doesn't exist, create it and set value
|
||||||
|
try {
|
||||||
|
ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT);
|
||||||
|
break;
|
||||||
|
} catch (KeeperException.ConnectionLossException
|
||||||
|
| KeeperException.SessionExpiredException ee) {
|
||||||
|
reconnectAfterExpiration();
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Failed to create znode " + node + " due to: " + e.getMessage() + ", will retry later");
|
||||||
|
}
|
||||||
|
} catch (KeeperException.ConnectionLossException
|
||||||
|
| KeeperException.SessionExpiredException ee) {
|
||||||
|
reconnectAfterExpiration();
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.debug("Failed to set data to client ZK, will retry later", e);
|
||||||
|
}
|
||||||
|
Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final void reconnectAfterExpiration() throws InterruptedException {
|
||||||
|
LOG.warn("ZK session expired or lost. Retry a new connection...");
|
||||||
|
try {
|
||||||
|
clientZkWatcher.reconnectAfterExpiration();
|
||||||
|
} catch (IOException | KeeperException e) {
|
||||||
|
LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void nodeCreated(String path) {
|
||||||
|
if (!validate(path)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
|
||||||
|
upsertQueue(path, data);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn("Unexpected exception handling nodeCreated event", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void nodeDataChanged(String path) {
|
||||||
|
if (validate(path)) {
|
||||||
|
nodeCreated(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void nodeDeleted(String path) {
|
||||||
|
if (validate(path)) {
|
||||||
|
try {
|
||||||
|
if (ZKUtil.watchAndCheckExists(watcher, path)) {
|
||||||
|
nodeCreated(path);
|
||||||
|
}
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate whether a znode path is watched by us
|
||||||
|
* @param path the path to validate
|
||||||
|
* @return true if the znode is watched by us
|
||||||
|
*/
|
||||||
|
abstract boolean validate(String path);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the znode(s) to watch
|
||||||
|
*/
|
||||||
|
abstract Collection<String> getNodesToWatch();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread to synchronize znode data to client ZK cluster
|
||||||
|
*/
|
||||||
|
class ClientZkUpdater extends Thread {
|
||||||
|
final String znode;
|
||||||
|
final BlockingQueue<byte[]> queue;
|
||||||
|
|
||||||
|
public ClientZkUpdater(String znode, BlockingQueue<byte[]> queue) {
|
||||||
|
this.znode = znode;
|
||||||
|
this.queue = queue;
|
||||||
|
setName("ClientZKUpdater-" + znode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (!server.isStopped()) {
|
||||||
|
try {
|
||||||
|
byte[] data = queue.take();
|
||||||
|
setDataForClientZkUntilSuccess(znode, data);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"Interrupted while checking whether need to update meta location to client zk");
|
||||||
|
}
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,52 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.zksyncer;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tracks the active master address on server ZK cluster and synchronize them to client ZK cluster
|
||||||
|
* if changed
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class MasterAddressSyncer extends ClientZKSyncer {
|
||||||
|
private final String masterAddressZNode;
|
||||||
|
|
||||||
|
public MasterAddressSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
|
||||||
|
super(watcher, clientZkWatcher, server);
|
||||||
|
masterAddressZNode = watcher.znodePaths.masterAddressZNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean validate(String path) {
|
||||||
|
return path.equals(masterAddressZNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Collection<String> getNodesToWatch() {
|
||||||
|
ArrayList<String> toReturn = new ArrayList<>();
|
||||||
|
toReturn.add(masterAddressZNode);
|
||||||
|
return toReturn;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.zksyncer;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tracks the meta region locations on server ZK cluster and synchronize them to client ZK cluster
|
||||||
|
* if changed
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class MetaLocationSyncer extends ClientZKSyncer {
|
||||||
|
public MetaLocationSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
|
||||||
|
super(watcher, clientZkWatcher, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean validate(String path) {
|
||||||
|
return watcher.znodePaths.isAnyMetaReplicaZNode(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Collection<String> getNodesToWatch() {
|
||||||
|
return watcher.znodePaths.metaReplicaZNodes.values();
|
||||||
|
}
|
||||||
|
}
|
|
@ -322,6 +322,8 @@ public class HRegionServer extends HasThread implements
|
||||||
|
|
||||||
volatile boolean killed = false;
|
volatile boolean killed = false;
|
||||||
|
|
||||||
|
private volatile boolean shutDown = false;
|
||||||
|
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
|
|
||||||
private Path rootDir;
|
private Path rootDir;
|
||||||
|
@ -774,6 +776,13 @@ public class HRegionServer extends HasThread implements
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected ClusterConnection createClusterConnection() throws IOException {
|
protected ClusterConnection createClusterConnection() throws IOException {
|
||||||
|
Configuration conf = this.conf;
|
||||||
|
if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
|
||||||
|
// Use server ZK cluster for server-issued connections, so we clone
|
||||||
|
// the conf and unset the client ZK related properties
|
||||||
|
conf = new Configuration(this.conf);
|
||||||
|
conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
|
||||||
|
}
|
||||||
// Create a cluster connection that when appropriate, can short-circuit and go directly to the
|
// Create a cluster connection that when appropriate, can short-circuit and go directly to the
|
||||||
// local server if the request is to the local server bypassing RPC. Can be used for both local
|
// local server if the request is to the local server bypassing RPC. Can be used for both local
|
||||||
// and remote invocations.
|
// and remote invocations.
|
||||||
|
@ -1147,6 +1156,7 @@ public class HRegionServer extends HasThread implements
|
||||||
if (this.zooKeeper != null) {
|
if (this.zooKeeper != null) {
|
||||||
this.zooKeeper.close();
|
this.zooKeeper.close();
|
||||||
}
|
}
|
||||||
|
this.shutDown = true;
|
||||||
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
|
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3724,4 +3734,8 @@ public class HRegionServer extends HasThread implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isShutDown() {
|
||||||
|
return shutDown;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,6 +146,10 @@ public class ReplicationSink {
|
||||||
if (StringUtils.isNotEmpty(replicationCodec)) {
|
if (StringUtils.isNotEmpty(replicationCodec)) {
|
||||||
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
|
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
|
||||||
}
|
}
|
||||||
|
// use server ZK cluster for replication, so we unset the client ZK related properties if any
|
||||||
|
if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
|
||||||
|
this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,268 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestSeparateClientZKCluster {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestSeparateClientZKCluster.class);
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static final File clientZkDir = new File("/tmp/TestSeparateClientZKCluster");
|
||||||
|
private static final int ZK_SESSION_TIMEOUT = 5000;
|
||||||
|
private static MiniZooKeeperCluster clientZkCluster;
|
||||||
|
|
||||||
|
private final byte[] family = Bytes.toBytes("cf");
|
||||||
|
private final byte[] qualifier = Bytes.toBytes("c1");
|
||||||
|
private final byte[] row = Bytes.toBytes("row");
|
||||||
|
private final byte[] value = Bytes.toBytes("v1");
|
||||||
|
private final byte[] newVal = Bytes.toBytes("v2");
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeAllTests() throws Exception {
|
||||||
|
int clientZkPort = 21828;
|
||||||
|
clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
|
||||||
|
clientZkCluster.setDefaultClientPort(clientZkPort);
|
||||||
|
clientZkCluster.startup(clientZkDir);
|
||||||
|
// reduce the retry number and start log counter
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
|
||||||
|
// core settings for testing client ZK cluster
|
||||||
|
TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
|
||||||
|
// reduce zk session timeout to easier trigger session expiration
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
|
||||||
|
// Start a cluster with 2 masters and 3 regionservers.
|
||||||
|
TEST_UTIL.startMiniCluster(2, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterAllTests() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
clientZkCluster.shutdown();
|
||||||
|
FileUtils.deleteDirectory(clientZkDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testBasicOperation() throws Exception {
|
||||||
|
TableName tn = TableName.valueOf(name.getMethodName());
|
||||||
|
// create table
|
||||||
|
Connection conn = TEST_UTIL.getConnection();
|
||||||
|
Admin admin = conn.getAdmin();
|
||||||
|
HTable table = (HTable) conn.getTable(tn);
|
||||||
|
try {
|
||||||
|
ColumnFamilyDescriptorBuilder cfDescBuilder =
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||||
|
TableDescriptorBuilder tableDescBuilder =
|
||||||
|
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||||
|
admin.createTable(tableDescBuilder.build());
|
||||||
|
// test simple get and put
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(family, qualifier, value);
|
||||||
|
table.put(put);
|
||||||
|
Get get = new Get(row);
|
||||||
|
Result result = table.get(get);
|
||||||
|
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
|
||||||
|
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMasterSwitch() throws Exception {
|
||||||
|
// get an admin instance and issue some request first
|
||||||
|
Connection conn = TEST_UTIL.getConnection();
|
||||||
|
Admin admin = conn.getAdmin();
|
||||||
|
LOG.debug("Tables: " + admin.listTableDescriptors());
|
||||||
|
try {
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
// switch active master
|
||||||
|
HMaster master = cluster.getMaster();
|
||||||
|
master.stopMaster();
|
||||||
|
while (!master.isShutDown()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
while (!cluster.getMaster().isInitialized()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
// confirm client access still works
|
||||||
|
Assert.assertTrue(admin.balance(false));
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMetaRegionMove() throws Exception {
|
||||||
|
TableName tn = TableName.valueOf(name.getMethodName());
|
||||||
|
// create table
|
||||||
|
Connection conn = TEST_UTIL.getConnection();
|
||||||
|
Admin admin = conn.getAdmin();
|
||||||
|
HTable table = (HTable) conn.getTable(tn);
|
||||||
|
try {
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
ColumnFamilyDescriptorBuilder cfDescBuilder =
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||||
|
TableDescriptorBuilder tableDescBuilder =
|
||||||
|
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||||
|
admin.createTable(tableDescBuilder.build());
|
||||||
|
// issue some requests to cache the region location
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(family, qualifier, value);
|
||||||
|
table.put(put);
|
||||||
|
Get get = new Get(row);
|
||||||
|
Result result = table.get(get);
|
||||||
|
// move meta region and confirm client could detect
|
||||||
|
byte[] destServerName = null;
|
||||||
|
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
|
||||||
|
ServerName name = rst.getRegionServer().getServerName();
|
||||||
|
if (!name.equals(cluster.getServerHoldingMeta())) {
|
||||||
|
destServerName = Bytes.toBytes(name.getServerName());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
|
||||||
|
LOG.debug("Finished moving meta");
|
||||||
|
// invalidate client cache
|
||||||
|
RegionInfo region =
|
||||||
|
table.getRegionLocator().getRegionLocation(row).getRegion();
|
||||||
|
ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
|
||||||
|
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
|
||||||
|
ServerName name = rst.getRegionServer().getServerName();
|
||||||
|
if (!name.equals(currentServer)) {
|
||||||
|
destServerName = Bytes.toBytes(name.getServerName());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
admin.move(region.getEncodedNameAsBytes(), destServerName);
|
||||||
|
LOG.debug("Finished moving user region");
|
||||||
|
put = new Put(row);
|
||||||
|
put.addColumn(family, qualifier, newVal);
|
||||||
|
table.put(put);
|
||||||
|
result = table.get(get);
|
||||||
|
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
|
||||||
|
Assert.assertArrayEquals(newVal, result.getValue(family, qualifier));
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
|
||||||
|
TableName tn = TableName.valueOf(name.getMethodName());
|
||||||
|
// create table
|
||||||
|
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
|
||||||
|
Admin admin = conn.getAdmin();
|
||||||
|
HTable table = (HTable) conn.getTable(tn);
|
||||||
|
try {
|
||||||
|
ColumnFamilyDescriptorBuilder cfDescBuilder =
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||||
|
TableDescriptorBuilder tableDescBuilder =
|
||||||
|
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||||
|
admin.createTable(tableDescBuilder.build());
|
||||||
|
// put some data
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(family, qualifier, value);
|
||||||
|
table.put(put);
|
||||||
|
// invalid connection cache
|
||||||
|
conn.clearRegionCache();
|
||||||
|
// stop client zk cluster
|
||||||
|
clientZkCluster.shutdown();
|
||||||
|
// stop current meta server and confirm the server shutdown process
|
||||||
|
// is not affected by client ZK crash
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
int metaServerId = cluster.getServerWithMeta();
|
||||||
|
HRegionServer metaServer = cluster.getRegionServer(metaServerId);
|
||||||
|
metaServer.stop("Stop current RS holding meta region");
|
||||||
|
while (!metaServer.isShutDown()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
// wait for meta region online
|
||||||
|
cluster.getMaster().getAssignmentManager()
|
||||||
|
.waitForAssignment(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||||
|
// wait some long time to make sure we will retry sync data to client ZK until data set
|
||||||
|
Thread.sleep(10000);
|
||||||
|
clientZkCluster.startup(clientZkDir);
|
||||||
|
// new request should pass
|
||||||
|
Get get = new Get(row);
|
||||||
|
Result result = table.get(get);
|
||||||
|
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
|
||||||
|
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAsyncTable() throws Exception {
|
||||||
|
TableName tn = TableName.valueOf(name.getMethodName());
|
||||||
|
ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
|
||||||
|
TableDescriptorBuilder tableDescBuilder =
|
||||||
|
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
|
||||||
|
try (AsyncConnection ASYNC_CONN =
|
||||||
|
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
|
||||||
|
ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
|
||||||
|
AsyncTable<?> table = ASYNC_CONN.getTable(tn);
|
||||||
|
// put some data
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(family, qualifier, value);
|
||||||
|
table.put(put).get();
|
||||||
|
// get and verify
|
||||||
|
Get get = new Get(row);
|
||||||
|
Result result = table.get(get).get();
|
||||||
|
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
|
||||||
|
Assert.assertArrayEquals(value, result.getValue(family, qualifier));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
@ -318,4 +319,40 @@ public class TestMasterNoCluster {
|
||||||
master.join();
|
master.join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMasterInitWithSameClientServerZKQuorum() throws Exception {
|
||||||
|
Configuration conf = new Configuration(TESTUTIL.getConfiguration());
|
||||||
|
conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
|
||||||
|
conf.setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, TESTUTIL.getZkCluster().getClientPort());
|
||||||
|
HMaster master = new HMaster(conf);
|
||||||
|
master.start();
|
||||||
|
// the master will abort due to IllegalArgumentException so we should finish within 60 seconds
|
||||||
|
master.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMasterInitWithObserverModeClientZKQuorum() throws Exception {
|
||||||
|
Configuration conf = new Configuration(TESTUTIL.getConfiguration());
|
||||||
|
Assert.assertFalse(Boolean.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE));
|
||||||
|
// set client ZK to some non-existing address and make sure server won't access client ZK
|
||||||
|
// (server start should not be affected)
|
||||||
|
conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
|
||||||
|
conf.setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT,
|
||||||
|
TESTUTIL.getZkCluster().getClientPort() + 1);
|
||||||
|
// settings to allow us not to start additional RS
|
||||||
|
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
|
||||||
|
conf.setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
|
||||||
|
// main setting for this test case
|
||||||
|
conf.setBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE, true);
|
||||||
|
HMaster master = new HMaster(conf);
|
||||||
|
master.start();
|
||||||
|
while (!master.isInitialized()) {
|
||||||
|
Threads.sleep(200);
|
||||||
|
}
|
||||||
|
Assert.assertNull(master.metaLocationSyncer);
|
||||||
|
Assert.assertNull(master.masterAddressSyncer);
|
||||||
|
master.stopMaster();
|
||||||
|
master.join();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,7 +238,7 @@ public class MiniZooKeeperCluster {
|
||||||
standaloneServerFactory.configure(
|
standaloneServerFactory.configure(
|
||||||
new InetSocketAddress(currentClientPort),
|
new InetSocketAddress(currentClientPort),
|
||||||
configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
|
configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
|
||||||
HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
|
HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
|
||||||
} catch (BindException e) {
|
} catch (BindException e) {
|
||||||
LOG.debug("Failed binding ZK Server to client port: " +
|
LOG.debug("Failed binding ZK Server to client port: " +
|
||||||
currentClientPort, e);
|
currentClientPort, e);
|
||||||
|
|
|
@ -46,7 +46,7 @@ public final class ZKServerTool {
|
||||||
for (String value : values) {
|
for (String value : values) {
|
||||||
String[] parts = value.split(":");
|
String[] parts = value.split(":");
|
||||||
String host = parts[0];
|
String host = parts[0];
|
||||||
int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
|
int port = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
|
||||||
if (parts.length > 1) {
|
if (parts.length > 1) {
|
||||||
port = Integer.parseInt(parts[1]);
|
port = Integer.parseInt(parts[1]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1932,7 +1932,7 @@ public final class ZKUtil {
|
||||||
|
|
||||||
String host = sp[0];
|
String host = sp[0];
|
||||||
int port = sp.length > 1 ? Integer.parseInt(sp[1])
|
int port = sp.length > 1 ? Integer.parseInt(sp[1])
|
||||||
: HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
|
: HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
|
||||||
|
|
||||||
InetSocketAddress sockAddr = new InetSocketAddress(host, port);
|
InetSocketAddress sockAddr = new InetSocketAddress(host, port);
|
||||||
try (Socket socket = new Socket()) {
|
try (Socket socket = new Socket()) {
|
||||||
|
|
|
@ -116,8 +116,43 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
|
||||||
public ZKWatcher(Configuration conf, String identifier,
|
public ZKWatcher(Configuration conf, String identifier,
|
||||||
Abortable abortable, boolean canCreateBaseZNode)
|
Abortable abortable, boolean canCreateBaseZNode)
|
||||||
throws IOException, ZooKeeperConnectionException {
|
throws IOException, ZooKeeperConnectionException {
|
||||||
|
this(conf, identifier, abortable, canCreateBaseZNode, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instantiate a ZooKeeper connection and watcher.
|
||||||
|
* @param conf the configuration to use
|
||||||
|
* @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
|
||||||
|
* this instance. Use null for default.
|
||||||
|
* @param abortable Can be null if there is on error there is no host to abort: e.g. client
|
||||||
|
* context.
|
||||||
|
* @param canCreateBaseZNode true if a base ZNode can be created
|
||||||
|
* @param clientZK whether this watcher is set to access client ZK
|
||||||
|
* @throws IOException if the connection to ZooKeeper fails
|
||||||
|
* @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
|
||||||
|
* ZNodes
|
||||||
|
*/
|
||||||
|
public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
|
||||||
|
boolean canCreateBaseZNode, boolean clientZK)
|
||||||
|
throws IOException, ZooKeeperConnectionException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
if (clientZK) {
|
||||||
|
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
|
||||||
|
String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
|
||||||
|
if (clientZkQuorumServers != null) {
|
||||||
|
if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
|
||||||
|
// Don't allow same settings to avoid dead loop when master trying
|
||||||
|
// to sync meta information from server ZK to client ZK
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"The quorum settings for client ZK should be different from those for server");
|
||||||
|
}
|
||||||
|
this.quorum = clientZkQuorumServers;
|
||||||
|
} else {
|
||||||
|
this.quorum = serverZkQuorumServers;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
this.quorum = ZKConfig.getZKQuorumServersString(conf);
|
this.quorum = ZKConfig.getZKQuorumServersString(conf);
|
||||||
|
}
|
||||||
this.prefix = identifier;
|
this.prefix = identifier;
|
||||||
// Identifier will get the sessionid appended later below down when we
|
// Identifier will get the sessionid appended later below down when we
|
||||||
// handle the syncconnect event.
|
// handle the syncconnect event.
|
||||||
|
|
Loading…
Reference in New Issue