diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
index d2f47633320..fc2d5f0d9df 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
@@ -124,7 +124,13 @@ public final class ReadOnlyZKClient implements Closeable {
}
public ReadOnlyZKClient(Configuration conf) {
- this.connectString = ZKConfig.getZKQuorumServersString(conf);
+ // We might use a different ZK for client access
+ String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
+ if (clientZkQuorumServers != null) {
+ this.connectString = clientZkQuorumServers;
+ } else {
+ this.connectString = ZKConfig.getZKQuorumServersString(conf);
+ }
this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
this.retryIntervalMs =
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 372d9b1466b..7ee31a56f92 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -187,6 +187,19 @@ public final class HConstants {
/** Name of ZooKeeper quorum configuration parameter. */
public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+ /** Name of ZooKeeper quorum configuration parameter for client to locate meta. */
+ public static final String CLIENT_ZOOKEEPER_QUORUM = "hbase.client.zookeeper.quorum";
+
+ /** Client port of ZooKeeper for client to locate meta */
+ public static final String CLIENT_ZOOKEEPER_CLIENT_PORT =
+ "hbase.client.zookeeper.property.clientPort";
+
+ /** Indicate whether the client ZK are observer nodes of the server ZK */
+ public static final String CLIENT_ZOOKEEPER_OBSERVER_MODE =
+ "hbase.client.zookeeper.observer.mode";
+ /** Assuming client zk not in observer mode and master need to synchronize information */
+ public static final boolean DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE = false;
+
/** Common prefix of ZooKeeper configuration properties */
public static final String ZK_CFG_PROPERTY_PREFIX =
"hbase.zookeeper.property.";
@@ -205,7 +218,7 @@ public final class HConstants {
ZK_CFG_PROPERTY_PREFIX + CLIENT_PORT_STR;
/** Default client port that the zookeeper listens on */
- public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181;
+ public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
/**
* Parameter name for the wait time for the recoverable zookeeper
@@ -239,7 +252,7 @@ public final class HConstants {
ZK_CFG_PROPERTY_PREFIX + "tickTime";
/** Default limit on concurrent client-side zookeeper connections */
- public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 300;
+ public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 300;
/** Configuration key for ZooKeeper session timeout */
public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
index 98917268060..f324ec68cf5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
@@ -24,6 +24,7 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -85,7 +86,7 @@ public final class ZKConfig {
// If clientPort is not set, assign the default.
if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
zkProperties.put(HConstants.CLIENT_PORT_STR,
- HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+ HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
}
// Create the server.X properties.
@@ -119,7 +120,7 @@ public final class ZKConfig {
*/
private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
String defaultClientPort = Integer.toString(
- conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
+ conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT));
// Build the ZK quorum server string with "server:clientport" list, separated by ','
final String[] serverHosts =
@@ -310,4 +311,23 @@ public final class ZKConfig {
return znodeParent;
}
}
+
+ /**
+ * Get the client ZK Quorum servers string
+ * @param conf the configuration to read
+ * @return Client quorum servers, or null if not specified
+ */
+ public static String getClientZKQuorumServersString(Configuration conf) {
+ String clientQuromServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+ if (clientQuromServers == null) {
+ return null;
+ }
+ int defaultClientPort =
+ conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
+ String clientZkClientPort =
+ Integer.toString(conf.getInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, defaultClientPort));
+ // Build the ZK quorum server string with "server:clientport" list, separated by ','
+ final String[] serverHosts = StringUtils.getStrings(clientQuromServers);
+ return buildZKQuorumServerString(serverHosts, clientZkClientPort);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index b31ff75e3db..7d751fbc1df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -136,6 +136,8 @@ import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
+import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -298,6 +300,10 @@ public class HMaster extends HRegionServer implements MasterServices {
private DrainingServerTracker drainingServerTracker;
// Tracker for load balancer state
LoadBalancerTracker loadBalancerTracker;
+ // Tracker for meta location, if any client ZK quorum specified
+ MetaLocationSyncer metaLocationSyncer;
+ // Tracker for active master location, if any client ZK quorum specified
+ MasterAddressSyncer masterAddressSyncer;
// Tracker for split and merge state
private SplitOrMergeTracker splitOrMergeTracker;
@@ -553,18 +559,20 @@ public class HMaster extends HRegionServer implements MasterServices {
public void run() {
try {
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
- try {
- int infoPort = putUpJettyServer();
- startActiveMasterManager(infoPort);
- } catch (Throwable t) {
- // Make sure we log the exception.
- String error = "Failed to become Active Master";
- LOG.error(error, t);
- // Abort should have been called already.
- if (!isAborted()) {
- abort(error, t);
+ Threads.setDaemonThreadRunning(new Thread(() -> {
+ try {
+ int infoPort = putUpJettyServer();
+ startActiveMasterManager(infoPort);
+ } catch (Throwable t) {
+ // Make sure we log the exception.
+ String error = "Failed to become Active Master";
+ LOG.error(error, t);
+ // Abort should have been called already.
+ if (!isAborted()) {
+ abort(error, t);
+ }
}
- }
+ }));
}
// Fall in here even if we have been aborted. Need to run the shutdown services and
// the super run call will do this for us.
@@ -746,6 +754,23 @@ public class HMaster extends HRegionServer implements MasterServices {
this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper);
this.maintenanceModeTracker.start();
+ String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+ boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
+ HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
+ if (clientQuorumServers != null && !clientZkObserverMode) {
+ // we need to take care of the ZK information synchronization
+ // if given client ZK are not observer nodes
+ ZKWatcher clientZkWatcher = new ZKWatcher(conf,
+ getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this,
+ false, true);
+ this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this);
+ this.metaLocationSyncer.start();
+ this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this);
+ this.masterAddressSyncer.start();
+ // set cluster id is a one-go effort
+ ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId());
+ }
+
// Set the cluster as up. If new RSs, they'll be waiting on this before
// going ahead with their startup.
boolean wasUp = this.clusterStatusTracker.isClusterUp();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java
new file mode 100644
index 00000000000..8f735bde4dc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java
@@ -0,0 +1,241 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.zksyncer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if
+ * changed
+ *
+ * The target znode(s) is given through {@link #getNodesToWatch()} method
+ */
+@InterfaceAudience.Private
+public abstract class ClientZKSyncer extends ZKListener {
+ private static final Log LOG = LogFactory.getLog(ClientZKSyncer.class);
+ private final Server server;
+ private final ZKWatcher clientZkWatcher;
+ // We use queues and daemon threads to synchronize the data to client ZK cluster
+ // to avoid blocking the single event thread for watchers
+ private final Map> queues;
+
+ public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
+ super(watcher);
+ this.server = server;
+ this.clientZkWatcher = clientZkWatcher;
+ this.queues = new HashMap<>();
+ }
+
+ /**
+ * Starts the syncer
+ * @throws KeeperException if error occurs when trying to create base nodes on client ZK
+ */
+ public void start() throws KeeperException {
+ LOG.debug("Starting " + getClass().getSimpleName());
+ this.watcher.registerListener(this);
+ // create base znode on remote ZK
+ ZKUtil.createWithParents(clientZkWatcher, watcher.znodePaths.baseZNode);
+ // set meta znodes for client ZK
+ Collection nodes = getNodesToWatch();
+ LOG.debug("Znodes to watch: " + nodes);
+ // initialize queues and threads
+ for (String node : nodes) {
+ BlockingQueue queue = new ArrayBlockingQueue<>(1);
+ queues.put(node, queue);
+ Thread updater = new ClientZkUpdater(node, queue);
+ updater.setDaemon(true);
+ updater.start();
+ watchAndCheckExists(node);
+ }
+ }
+
+ private void watchAndCheckExists(String node) {
+ try {
+ if (ZKUtil.watchAndCheckExists(watcher, node)) {
+ byte[] data = ZKUtil.getDataAndWatch(watcher, node);
+ if (data != null) {
+ // put the data into queue
+ upsertQueue(node, data);
+ } else {
+ // It existed but now does not, should has been tracked by our watcher, ignore
+ LOG.debug("Found no data from " + node);
+ watchAndCheckExists(node);
+ }
+ } else {
+ // cleanup stale ZNodes on client ZK to avoid invalid requests to server
+ ZKUtil.deleteNodeFailSilent(clientZkWatcher, node);
+ }
+ } catch (KeeperException e) {
+ server.abort("Unexpected exception during initialization, aborting", e);
+ }
+ }
+
+ /**
+ * Update the value of the single element in queue if any, or else insert.
+ *
+ * We only need to synchronize the latest znode value to client ZK rather than synchronize each
+ * time
+ * @param data the data to write to queue
+ */
+ private void upsertQueue(String node, byte[] data) {
+ BlockingQueue queue = queues.get(node);
+ synchronized (queue) {
+ queue.poll();
+ queue.offer(data);
+ }
+ }
+
+ /**
+ * Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when
+ * modifying this method
+ * @param node the znode to set on client ZK
+ * @param data the data to set to client ZK
+ * @throws InterruptedException if the thread is interrupted during process
+ */
+ private final void setDataForClientZkUntilSuccess(String node, byte[] data)
+ throws InterruptedException {
+ while (!server.isStopped()) {
+ try {
+ LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher);
+ ZKUtil.setData(clientZkWatcher, node, data);
+ break;
+ } catch (KeeperException.NoNodeException nne) {
+ // Node doesn't exist, create it and set value
+ try {
+ ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT);
+ break;
+ } catch (KeeperException.ConnectionLossException
+ | KeeperException.SessionExpiredException ee) {
+ reconnectAfterExpiration();
+ } catch (KeeperException e) {
+ LOG.warn(
+ "Failed to create znode " + node + " due to: " + e.getMessage() + ", will retry later");
+ }
+ } catch (KeeperException.ConnectionLossException
+ | KeeperException.SessionExpiredException ee) {
+ reconnectAfterExpiration();
+ } catch (KeeperException e) {
+ LOG.debug("Failed to set data to client ZK, will retry later", e);
+ }
+ Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
+ }
+ }
+
+ private final void reconnectAfterExpiration() throws InterruptedException {
+ LOG.warn("ZK session expired or lost. Retry a new connection...");
+ try {
+ clientZkWatcher.reconnectAfterExpiration();
+ } catch (IOException | KeeperException e) {
+ LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e);
+ }
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ if (!validate(path)) {
+ return;
+ }
+ try {
+ byte[] data = ZKUtil.getDataAndWatch(watcher, path);
+ upsertQueue(path, data);
+ } catch (KeeperException e) {
+ LOG.warn("Unexpected exception handling nodeCreated event", e);
+ }
+ }
+
+ @Override
+ public void nodeDataChanged(String path) {
+ if (validate(path)) {
+ nodeCreated(path);
+ }
+ }
+
+ @Override
+ public synchronized void nodeDeleted(String path) {
+ if (validate(path)) {
+ try {
+ if (ZKUtil.watchAndCheckExists(watcher, path)) {
+ nodeCreated(path);
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e);
+ }
+ }
+ }
+
+ /**
+ * Validate whether a znode path is watched by us
+ * @param path the path to validate
+ * @return true if the znode is watched by us
+ */
+ abstract boolean validate(String path);
+
+ /**
+ * @return the znode(s) to watch
+ */
+ abstract Collection getNodesToWatch();
+
+ /**
+ * Thread to synchronize znode data to client ZK cluster
+ */
+ class ClientZkUpdater extends Thread {
+ final String znode;
+ final BlockingQueue queue;
+
+ public ClientZkUpdater(String znode, BlockingQueue queue) {
+ this.znode = znode;
+ this.queue = queue;
+ setName("ClientZKUpdater-" + znode);
+ }
+
+ @Override
+ public void run() {
+ while (!server.isStopped()) {
+ try {
+ byte[] data = queue.take();
+ setDataForClientZkUntilSuccess(znode, data);
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Interrupted while checking whether need to update meta location to client zk");
+ }
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java
new file mode 100644
index 00000000000..3da8558cc68
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.zksyncer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Tracks the active master address on server ZK cluster and synchronize them to client ZK cluster
+ * if changed
+ */
+@InterfaceAudience.Private
+public class MasterAddressSyncer extends ClientZKSyncer {
+ private final String masterAddressZNode;
+
+ public MasterAddressSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
+ super(watcher, clientZkWatcher, server);
+ masterAddressZNode = watcher.znodePaths.masterAddressZNode;
+ }
+
+ @Override
+ boolean validate(String path) {
+ return path.equals(masterAddressZNode);
+ }
+
+ @Override
+ Collection getNodesToWatch() {
+ ArrayList toReturn = new ArrayList<>();
+ toReturn.add(masterAddressZNode);
+ return toReturn;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java
new file mode 100644
index 00000000000..68f7fc40977
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.zksyncer;
+
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Tracks the meta region locations on server ZK cluster and synchronize them to client ZK cluster
+ * if changed
+ */
+@InterfaceAudience.Private
+public class MetaLocationSyncer extends ClientZKSyncer {
+ public MetaLocationSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
+ super(watcher, clientZkWatcher, server);
+ }
+
+ @Override
+ boolean validate(String path) {
+ return watcher.znodePaths.isAnyMetaReplicaZNode(path);
+ }
+
+ @Override
+ Collection getNodesToWatch() {
+ return watcher.znodePaths.metaReplicaZNodes.values();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 680c7d94ef6..aa5ee3db64e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -322,6 +322,8 @@ public class HRegionServer extends HasThread implements
volatile boolean killed = false;
+ private volatile boolean shutDown = false;
+
protected final Configuration conf;
private Path rootDir;
@@ -774,6 +776,13 @@ public class HRegionServer extends HasThread implements
*/
@VisibleForTesting
protected ClusterConnection createClusterConnection() throws IOException {
+ Configuration conf = this.conf;
+ if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+ // Use server ZK cluster for server-issued connections, so we clone
+ // the conf and unset the client ZK related properties
+ conf = new Configuration(this.conf);
+ conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+ }
// Create a cluster connection that when appropriate, can short-circuit and go directly to the
// local server if the request is to the local server bypassing RPC. Can be used for both local
// and remote invocations.
@@ -1147,6 +1156,7 @@ public class HRegionServer extends HasThread implements
if (this.zooKeeper != null) {
this.zooKeeper.close();
}
+ this.shutDown = true;
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
}
@@ -3724,4 +3734,8 @@ public class HRegionServer extends HasThread implements
}
}
}
+
+ public boolean isShutDown() {
+ return shutDown;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 5a056604eeb..fb4e0f95b1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -146,6 +146,10 @@ public class ReplicationSink {
if (StringUtils.isNotEmpty(replicationCodec)) {
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
}
+ // use server ZK cluster for replication, so we unset the client ZK related properties if any
+ if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+ this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+ }
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java
new file mode 100644
index 00000000000..d7caac6e409
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestSeparateClientZKCluster {
+ private static final Log LOG = LogFactory.getLog(TestSeparateClientZKCluster.class);
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final File clientZkDir = new File("/tmp/TestSeparateClientZKCluster");
+ private static final int ZK_SESSION_TIMEOUT = 5000;
+ private static MiniZooKeeperCluster clientZkCluster;
+
+ private final byte[] family = Bytes.toBytes("cf");
+ private final byte[] qualifier = Bytes.toBytes("c1");
+ private final byte[] row = Bytes.toBytes("row");
+ private final byte[] value = Bytes.toBytes("v1");
+ private final byte[] newVal = Bytes.toBytes("v2");
+
+ @Rule
+ public TestName name = new TestName();
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ int clientZkPort = 21828;
+ clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
+ clientZkCluster.setDefaultClientPort(clientZkPort);
+ clientZkCluster.startup(clientZkDir);
+ // reduce the retry number and start log counter
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
+ TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
+ // core settings for testing client ZK cluster
+ TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+ TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
+ // reduce zk session timeout to easier trigger session expiration
+ TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
+ // Start a cluster with 2 masters and 3 regionservers.
+ TEST_UTIL.startMiniCluster(2, 3);
+ }
+
+ @AfterClass
+ public static void afterAllTests() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ clientZkCluster.shutdown();
+ FileUtils.deleteDirectory(clientZkDir);
+ }
+
+ @Test(timeout = 60000)
+ public void testBasicOperation() throws Exception {
+ TableName tn = TableName.valueOf(name.getMethodName());
+ // create table
+ Connection conn = TEST_UTIL.getConnection();
+ Admin admin = conn.getAdmin();
+ HTable table = (HTable) conn.getTable(tn);
+ try {
+ ColumnFamilyDescriptorBuilder cfDescBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(family);
+ TableDescriptorBuilder tableDescBuilder =
+ TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
+ admin.createTable(tableDescBuilder.build());
+ // test simple get and put
+ Put put = new Put(row);
+ put.addColumn(family, qualifier, value);
+ table.put(put);
+ Get get = new Get(row);
+ Result result = table.get(get);
+ LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
+ Assert.assertArrayEquals(value, result.getValue(family, qualifier));
+ } finally {
+ admin.close();
+ table.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testMasterSwitch() throws Exception {
+ // get an admin instance and issue some request first
+ Connection conn = TEST_UTIL.getConnection();
+ Admin admin = conn.getAdmin();
+ LOG.debug("Tables: " + admin.listTableDescriptors());
+ try {
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ // switch active master
+ HMaster master = cluster.getMaster();
+ master.stopMaster();
+ while (!master.isShutDown()) {
+ Thread.sleep(200);
+ }
+ while (!cluster.getMaster().isInitialized()) {
+ Thread.sleep(200);
+ }
+ // confirm client access still works
+ Assert.assertTrue(admin.balance(false));
+ } finally {
+ admin.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testMetaRegionMove() throws Exception {
+ TableName tn = TableName.valueOf(name.getMethodName());
+ // create table
+ Connection conn = TEST_UTIL.getConnection();
+ Admin admin = conn.getAdmin();
+ HTable table = (HTable) conn.getTable(tn);
+ try {
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ ColumnFamilyDescriptorBuilder cfDescBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(family);
+ TableDescriptorBuilder tableDescBuilder =
+ TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
+ admin.createTable(tableDescBuilder.build());
+ // issue some requests to cache the region location
+ Put put = new Put(row);
+ put.addColumn(family, qualifier, value);
+ table.put(put);
+ Get get = new Get(row);
+ Result result = table.get(get);
+ // move meta region and confirm client could detect
+ byte[] destServerName = null;
+ for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+ ServerName name = rst.getRegionServer().getServerName();
+ if (!name.equals(cluster.getServerHoldingMeta())) {
+ destServerName = Bytes.toBytes(name.getServerName());
+ break;
+ }
+ }
+ admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
+ LOG.debug("Finished moving meta");
+ // invalidate client cache
+ RegionInfo region =
+ table.getRegionLocator().getRegionLocation(row).getRegion();
+ ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
+ for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+ ServerName name = rst.getRegionServer().getServerName();
+ if (!name.equals(currentServer)) {
+ destServerName = Bytes.toBytes(name.getServerName());
+ break;
+ }
+ }
+ admin.move(region.getEncodedNameAsBytes(), destServerName);
+ LOG.debug("Finished moving user region");
+ put = new Put(row);
+ put.addColumn(family, qualifier, newVal);
+ table.put(put);
+ result = table.get(get);
+ LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
+ Assert.assertArrayEquals(newVal, result.getValue(family, qualifier));
+ } finally {
+ admin.close();
+ table.close();
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
+ TableName tn = TableName.valueOf(name.getMethodName());
+ // create table
+ ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
+ Admin admin = conn.getAdmin();
+ HTable table = (HTable) conn.getTable(tn);
+ try {
+ ColumnFamilyDescriptorBuilder cfDescBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(family);
+ TableDescriptorBuilder tableDescBuilder =
+ TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
+ admin.createTable(tableDescBuilder.build());
+ // put some data
+ Put put = new Put(row);
+ put.addColumn(family, qualifier, value);
+ table.put(put);
+ // invalid connection cache
+ conn.clearRegionCache();
+ // stop client zk cluster
+ clientZkCluster.shutdown();
+ // stop current meta server and confirm the server shutdown process
+ // is not affected by client ZK crash
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ int metaServerId = cluster.getServerWithMeta();
+ HRegionServer metaServer = cluster.getRegionServer(metaServerId);
+ metaServer.stop("Stop current RS holding meta region");
+ while (!metaServer.isShutDown()) {
+ Thread.sleep(200);
+ }
+ // wait for meta region online
+ cluster.getMaster().getAssignmentManager()
+ .waitForAssignment(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ // wait some long time to make sure we will retry sync data to client ZK until data set
+ Thread.sleep(10000);
+ clientZkCluster.startup(clientZkDir);
+ // new request should pass
+ Get get = new Get(row);
+ Result result = table.get(get);
+ LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
+ Assert.assertArrayEquals(value, result.getValue(family, qualifier));
+ } finally {
+ admin.close();
+ table.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testAsyncTable() throws Exception {
+ TableName tn = TableName.valueOf(name.getMethodName());
+ ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
+ TableDescriptorBuilder tableDescBuilder =
+ TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
+ try (AsyncConnection ASYNC_CONN =
+ ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
+ ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
+ AsyncTable> table = ASYNC_CONN.getTable(tn);
+ // put some data
+ Put put = new Put(row);
+ put.addColumn(family, qualifier, value);
+ table.put(put).get();
+ // get and verify
+ Get get = new Get(row);
+ Result result = table.get(get).get();
+ LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
+ Assert.assertArrayEquals(value, result.getValue(family, qualifier));
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
index 2b85e25abea..9ee10a7e7fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
@@ -318,4 +319,40 @@ public class TestMasterNoCluster {
master.join();
}
}
+
+ @Test(timeout = 60000)
+ public void testMasterInitWithSameClientServerZKQuorum() throws Exception {
+ Configuration conf = new Configuration(TESTUTIL.getConfiguration());
+ conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+ conf.setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, TESTUTIL.getZkCluster().getClientPort());
+ HMaster master = new HMaster(conf);
+ master.start();
+ // the master will abort due to IllegalArgumentException so we should finish within 60 seconds
+ master.join();
+ }
+
+ @Test(timeout = 60000)
+ public void testMasterInitWithObserverModeClientZKQuorum() throws Exception {
+ Configuration conf = new Configuration(TESTUTIL.getConfiguration());
+ Assert.assertFalse(Boolean.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE));
+ // set client ZK to some non-existing address and make sure server won't access client ZK
+ // (server start should not be affected)
+ conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+ conf.setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT,
+ TESTUTIL.getZkCluster().getClientPort() + 1);
+ // settings to allow us not to start additional RS
+ conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+ conf.setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
+ // main setting for this test case
+ conf.setBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE, true);
+ HMaster master = new HMaster(conf);
+ master.start();
+ while (!master.isInitialized()) {
+ Threads.sleep(200);
+ }
+ Assert.assertNull(master.metaLocationSyncer);
+ Assert.assertNull(master.masterAddressSyncer);
+ master.stopMaster();
+ master.join();
+ }
}
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
index 010ec8c541c..b2645639cce 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
@@ -238,7 +238,7 @@ public class MiniZooKeeperCluster {
standaloneServerFactory.configure(
new InetSocketAddress(currentClientPort),
configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
- HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
+ HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
} catch (BindException e) {
LOG.debug("Failed binding ZK Server to client port: " +
currentClientPort, e);
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
index dd71dee5b8c..0db6205c693 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
@@ -46,7 +46,7 @@ public final class ZKServerTool {
for (String value : values) {
String[] parts = value.split(":");
String host = parts[0];
- int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+ int port = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
if (parts.length > 1) {
port = Integer.parseInt(parts[1]);
}
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 7bb47527597..dda53191064 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1932,7 +1932,7 @@ public final class ZKUtil {
String host = sp[0];
int port = sp.length > 1 ? Integer.parseInt(sp[1])
- : HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+ : HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
InetSocketAddress sockAddr = new InetSocketAddress(host, port);
try (Socket socket = new Socket()) {
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
index 3aac946947b..c3cac5f3d20 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
@@ -116,8 +116,43 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
public ZKWatcher(Configuration conf, String identifier,
Abortable abortable, boolean canCreateBaseZNode)
throws IOException, ZooKeeperConnectionException {
+ this(conf, identifier, abortable, canCreateBaseZNode, false);
+ }
+
+ /**
+ * Instantiate a ZooKeeper connection and watcher.
+ * @param conf the configuration to use
+ * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
+ * this instance. Use null for default.
+ * @param abortable Can be null if there is on error there is no host to abort: e.g. client
+ * context.
+ * @param canCreateBaseZNode true if a base ZNode can be created
+ * @param clientZK whether this watcher is set to access client ZK
+ * @throws IOException if the connection to ZooKeeper fails
+ * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
+ * ZNodes
+ */
+ public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
+ boolean canCreateBaseZNode, boolean clientZK)
+ throws IOException, ZooKeeperConnectionException {
this.conf = conf;
- this.quorum = ZKConfig.getZKQuorumServersString(conf);
+ if (clientZK) {
+ String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
+ String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
+ if (clientZkQuorumServers != null) {
+ if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
+ // Don't allow same settings to avoid dead loop when master trying
+ // to sync meta information from server ZK to client ZK
+ throw new IllegalArgumentException(
+ "The quorum settings for client ZK should be different from those for server");
+ }
+ this.quorum = clientZkQuorumServers;
+ } else {
+ this.quorum = serverZkQuorumServers;
+ }
+ } else {
+ this.quorum = ZKConfig.getZKQuorumServersString(conf);
+ }
this.prefix = identifier;
// Identifier will get the sessionid appended later below down when we
// handle the syncconnect event.