From b72e19e3b989fa46c72d356703f5c0334b83fe76 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Thu, 29 Mar 2018 02:37:26 +0800 Subject: [PATCH] HBASE-20159 Support using separate ZK quorums for client --- .../hbase/zookeeper/ReadOnlyZKClient.java | 8 +- .../org/apache/hadoop/hbase/HConstants.java | 17 +- .../hadoop/hbase/zookeeper/ZKConfig.java | 24 +- .../apache/hadoop/hbase/master/HMaster.java | 47 ++- .../hbase/master/zksyncer/ClientZKSyncer.java | 241 ++++++++++++++++ .../master/zksyncer/MasterAddressSyncer.java | 52 ++++ .../master/zksyncer/MetaLocationSyncer.java | 46 +++ .../hbase/regionserver/HRegionServer.java | 14 + .../regionserver/ReplicationSink.java | 4 + .../client/TestSeparateClientZKCluster.java | 268 ++++++++++++++++++ .../hbase/master/TestMasterNoCluster.java | 37 +++ .../hbase/zookeeper/MiniZooKeeperCluster.java | 2 +- .../hadoop/hbase/zookeeper/ZKServerTool.java | 2 +- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 2 +- .../hadoop/hbase/zookeeper/ZKWatcher.java | 37 ++- 15 files changed, 781 insertions(+), 20 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index d2f47633320..fc2d5f0d9df 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -124,7 +124,13 @@ public final class ReadOnlyZKClient implements Closeable { } public ReadOnlyZKClient(Configuration conf) { - this.connectString = ZKConfig.getZKQuorumServersString(conf); + // We might use a different ZK for client access + String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); + if (clientZkQuorumServers != null) { + this.connectString = clientZkQuorumServers; + } else { + this.connectString = ZKConfig.getZKQuorumServersString(conf); + } this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT); this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY); this.retryIntervalMs = diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 372d9b1466b..7ee31a56f92 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -187,6 +187,19 @@ public final class HConstants { /** Name of ZooKeeper quorum configuration parameter. */ public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + /** Name of ZooKeeper quorum configuration parameter for client to locate meta. */ + public static final String CLIENT_ZOOKEEPER_QUORUM = "hbase.client.zookeeper.quorum"; + + /** Client port of ZooKeeper for client to locate meta */ + public static final String CLIENT_ZOOKEEPER_CLIENT_PORT = + "hbase.client.zookeeper.property.clientPort"; + + /** Indicate whether the client ZK are observer nodes of the server ZK */ + public static final String CLIENT_ZOOKEEPER_OBSERVER_MODE = + "hbase.client.zookeeper.observer.mode"; + /** Assuming client zk not in observer mode and master need to synchronize information */ + public static final boolean DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE = false; + /** Common prefix of ZooKeeper configuration properties */ public static final String ZK_CFG_PROPERTY_PREFIX = "hbase.zookeeper.property."; @@ -205,7 +218,7 @@ public final class HConstants { ZK_CFG_PROPERTY_PREFIX + CLIENT_PORT_STR; /** Default client port that the zookeeper listens on */ - public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181; + public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181; /** * Parameter name for the wait time for the recoverable zookeeper @@ -239,7 +252,7 @@ public final class HConstants { ZK_CFG_PROPERTY_PREFIX + "tickTime"; /** Default limit on concurrent client-side zookeeper connections */ - public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 300; + public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 300; /** Configuration key for ZooKeeper session timeout */ public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout"; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java index 98917268060..f324ec68cf5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java @@ -24,6 +24,7 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -85,7 +86,7 @@ public final class ZKConfig { // If clientPort is not set, assign the default. if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) { zkProperties.put(HConstants.CLIENT_PORT_STR, - HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); } // Create the server.X properties. @@ -119,7 +120,7 @@ public final class ZKConfig { */ private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) { String defaultClientPort = Integer.toString( - conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT)); + conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT)); // Build the ZK quorum server string with "server:clientport" list, separated by ',' final String[] serverHosts = @@ -310,4 +311,23 @@ public final class ZKConfig { return znodeParent; } } + + /** + * Get the client ZK Quorum servers string + * @param conf the configuration to read + * @return Client quorum servers, or null if not specified + */ + public static String getClientZKQuorumServersString(Configuration conf) { + String clientQuromServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM); + if (clientQuromServers == null) { + return null; + } + int defaultClientPort = + conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT); + String clientZkClientPort = + Integer.toString(conf.getInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, defaultClientPort)); + // Build the ZK quorum server string with "server:clientport" list, separated by ',' + final String[] serverHosts = StringUtils.getStrings(clientQuromServers); + return buildZKQuorumServerString(serverHosts, clientZkClientPort); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b31ff75e3db..7d751fbc1df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -136,6 +136,8 @@ import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer; +import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -298,6 +300,10 @@ public class HMaster extends HRegionServer implements MasterServices { private DrainingServerTracker drainingServerTracker; // Tracker for load balancer state LoadBalancerTracker loadBalancerTracker; + // Tracker for meta location, if any client ZK quorum specified + MetaLocationSyncer metaLocationSyncer; + // Tracker for active master location, if any client ZK quorum specified + MasterAddressSyncer masterAddressSyncer; // Tracker for split and merge state private SplitOrMergeTracker splitOrMergeTracker; @@ -553,18 +559,20 @@ public class HMaster extends HRegionServer implements MasterServices { public void run() { try { if (!conf.getBoolean("hbase.testing.nocluster", false)) { - try { - int infoPort = putUpJettyServer(); - startActiveMasterManager(infoPort); - } catch (Throwable t) { - // Make sure we log the exception. - String error = "Failed to become Active Master"; - LOG.error(error, t); - // Abort should have been called already. - if (!isAborted()) { - abort(error, t); + Threads.setDaemonThreadRunning(new Thread(() -> { + try { + int infoPort = putUpJettyServer(); + startActiveMasterManager(infoPort); + } catch (Throwable t) { + // Make sure we log the exception. + String error = "Failed to become Active Master"; + LOG.error(error, t); + // Abort should have been called already. + if (!isAborted()) { + abort(error, t); + } } - } + })); } // Fall in here even if we have been aborted. Need to run the shutdown services and // the super run call will do this for us. @@ -746,6 +754,23 @@ public class HMaster extends HRegionServer implements MasterServices { this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper); this.maintenanceModeTracker.start(); + String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM); + boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE, + HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE); + if (clientQuorumServers != null && !clientZkObserverMode) { + // we need to take care of the ZK information synchronization + // if given client ZK are not observer nodes + ZKWatcher clientZkWatcher = new ZKWatcher(conf, + getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this, + false, true); + this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this); + this.metaLocationSyncer.start(); + this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this); + this.masterAddressSyncer.start(); + // set cluster id is a one-go effort + ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId()); + } + // Set the cluster as up. If new RSs, they'll be waiting on this before // going ahead with their startup. boolean wasUp = this.clusterStatusTracker.isClusterUp(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java new file mode 100644 index 00000000000..8f735bde4dc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java @@ -0,0 +1,241 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.zksyncer; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; + +/** + * Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if + * changed + *

+ * The target znode(s) is given through {@link #getNodesToWatch()} method + */ +@InterfaceAudience.Private +public abstract class ClientZKSyncer extends ZKListener { + private static final Log LOG = LogFactory.getLog(ClientZKSyncer.class); + private final Server server; + private final ZKWatcher clientZkWatcher; + // We use queues and daemon threads to synchronize the data to client ZK cluster + // to avoid blocking the single event thread for watchers + private final Map> queues; + + public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) { + super(watcher); + this.server = server; + this.clientZkWatcher = clientZkWatcher; + this.queues = new HashMap<>(); + } + + /** + * Starts the syncer + * @throws KeeperException if error occurs when trying to create base nodes on client ZK + */ + public void start() throws KeeperException { + LOG.debug("Starting " + getClass().getSimpleName()); + this.watcher.registerListener(this); + // create base znode on remote ZK + ZKUtil.createWithParents(clientZkWatcher, watcher.znodePaths.baseZNode); + // set meta znodes for client ZK + Collection nodes = getNodesToWatch(); + LOG.debug("Znodes to watch: " + nodes); + // initialize queues and threads + for (String node : nodes) { + BlockingQueue queue = new ArrayBlockingQueue<>(1); + queues.put(node, queue); + Thread updater = new ClientZkUpdater(node, queue); + updater.setDaemon(true); + updater.start(); + watchAndCheckExists(node); + } + } + + private void watchAndCheckExists(String node) { + try { + if (ZKUtil.watchAndCheckExists(watcher, node)) { + byte[] data = ZKUtil.getDataAndWatch(watcher, node); + if (data != null) { + // put the data into queue + upsertQueue(node, data); + } else { + // It existed but now does not, should has been tracked by our watcher, ignore + LOG.debug("Found no data from " + node); + watchAndCheckExists(node); + } + } else { + // cleanup stale ZNodes on client ZK to avoid invalid requests to server + ZKUtil.deleteNodeFailSilent(clientZkWatcher, node); + } + } catch (KeeperException e) { + server.abort("Unexpected exception during initialization, aborting", e); + } + } + + /** + * Update the value of the single element in queue if any, or else insert. + *

+ * We only need to synchronize the latest znode value to client ZK rather than synchronize each + * time + * @param data the data to write to queue + */ + private void upsertQueue(String node, byte[] data) { + BlockingQueue queue = queues.get(node); + synchronized (queue) { + queue.poll(); + queue.offer(data); + } + } + + /** + * Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when + * modifying this method + * @param node the znode to set on client ZK + * @param data the data to set to client ZK + * @throws InterruptedException if the thread is interrupted during process + */ + private final void setDataForClientZkUntilSuccess(String node, byte[] data) + throws InterruptedException { + while (!server.isStopped()) { + try { + LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher); + ZKUtil.setData(clientZkWatcher, node, data); + break; + } catch (KeeperException.NoNodeException nne) { + // Node doesn't exist, create it and set value + try { + ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT); + break; + } catch (KeeperException.ConnectionLossException + | KeeperException.SessionExpiredException ee) { + reconnectAfterExpiration(); + } catch (KeeperException e) { + LOG.warn( + "Failed to create znode " + node + " due to: " + e.getMessage() + ", will retry later"); + } + } catch (KeeperException.ConnectionLossException + | KeeperException.SessionExpiredException ee) { + reconnectAfterExpiration(); + } catch (KeeperException e) { + LOG.debug("Failed to set data to client ZK, will retry later", e); + } + Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS); + } + } + + private final void reconnectAfterExpiration() throws InterruptedException { + LOG.warn("ZK session expired or lost. Retry a new connection..."); + try { + clientZkWatcher.reconnectAfterExpiration(); + } catch (IOException | KeeperException e) { + LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e); + } + } + + @Override + public void nodeCreated(String path) { + if (!validate(path)) { + return; + } + try { + byte[] data = ZKUtil.getDataAndWatch(watcher, path); + upsertQueue(path, data); + } catch (KeeperException e) { + LOG.warn("Unexpected exception handling nodeCreated event", e); + } + } + + @Override + public void nodeDataChanged(String path) { + if (validate(path)) { + nodeCreated(path); + } + } + + @Override + public synchronized void nodeDeleted(String path) { + if (validate(path)) { + try { + if (ZKUtil.watchAndCheckExists(watcher, path)) { + nodeCreated(path); + } + } catch (KeeperException e) { + LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e); + } + } + } + + /** + * Validate whether a znode path is watched by us + * @param path the path to validate + * @return true if the znode is watched by us + */ + abstract boolean validate(String path); + + /** + * @return the znode(s) to watch + */ + abstract Collection getNodesToWatch(); + + /** + * Thread to synchronize znode data to client ZK cluster + */ + class ClientZkUpdater extends Thread { + final String znode; + final BlockingQueue queue; + + public ClientZkUpdater(String znode, BlockingQueue queue) { + this.znode = znode; + this.queue = queue; + setName("ClientZKUpdater-" + znode); + } + + @Override + public void run() { + while (!server.isStopped()) { + try { + byte[] data = queue.take(); + setDataForClientZkUntilSuccess(znode, data); + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Interrupted while checking whether need to update meta location to client zk"); + } + Thread.currentThread().interrupt(); + break; + } + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java new file mode 100644 index 00000000000..3da8558cc68 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java @@ -0,0 +1,52 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.zksyncer; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Tracks the active master address on server ZK cluster and synchronize them to client ZK cluster + * if changed + */ +@InterfaceAudience.Private +public class MasterAddressSyncer extends ClientZKSyncer { + private final String masterAddressZNode; + + public MasterAddressSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) { + super(watcher, clientZkWatcher, server); + masterAddressZNode = watcher.znodePaths.masterAddressZNode; + } + + @Override + boolean validate(String path) { + return path.equals(masterAddressZNode); + } + + @Override + Collection getNodesToWatch() { + ArrayList toReturn = new ArrayList<>(); + toReturn.add(masterAddressZNode); + return toReturn; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java new file mode 100644 index 00000000000..68f7fc40977 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java @@ -0,0 +1,46 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.zksyncer; + +import java.util.Collection; + +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Tracks the meta region locations on server ZK cluster and synchronize them to client ZK cluster + * if changed + */ +@InterfaceAudience.Private +public class MetaLocationSyncer extends ClientZKSyncer { + public MetaLocationSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) { + super(watcher, clientZkWatcher, server); + } + + @Override + boolean validate(String path) { + return watcher.znodePaths.isAnyMetaReplicaZNode(path); + } + + @Override + Collection getNodesToWatch() { + return watcher.znodePaths.metaReplicaZNodes.values(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 680c7d94ef6..aa5ee3db64e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -322,6 +322,8 @@ public class HRegionServer extends HasThread implements volatile boolean killed = false; + private volatile boolean shutDown = false; + protected final Configuration conf; private Path rootDir; @@ -774,6 +776,13 @@ public class HRegionServer extends HasThread implements */ @VisibleForTesting protected ClusterConnection createClusterConnection() throws IOException { + Configuration conf = this.conf; + if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { + // Use server ZK cluster for server-issued connections, so we clone + // the conf and unset the client ZK related properties + conf = new Configuration(this.conf); + conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); + } // Create a cluster connection that when appropriate, can short-circuit and go directly to the // local server if the request is to the local server bypassing RPC. Can be used for both local // and remote invocations. @@ -1147,6 +1156,7 @@ public class HRegionServer extends HasThread implements if (this.zooKeeper != null) { this.zooKeeper.close(); } + this.shutDown = true; LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); } @@ -3724,4 +3734,8 @@ public class HRegionServer extends HasThread implements } } } + + public boolean isShutDown() { + return shutDown; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 5a056604eeb..fb4e0f95b1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -146,6 +146,10 @@ public class ReplicationSink { if (StringUtils.isNotEmpty(replicationCodec)) { this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); } + // use server ZK cluster for replication, so we unset the client ZK related properties if any + if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { + this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java new file mode 100644 index 00000000000..d7caac6e409 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.File; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(MediumTests.class) +public class TestSeparateClientZKCluster { + private static final Log LOG = LogFactory.getLog(TestSeparateClientZKCluster.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final File clientZkDir = new File("/tmp/TestSeparateClientZKCluster"); + private static final int ZK_SESSION_TIMEOUT = 5000; + private static MiniZooKeeperCluster clientZkCluster; + + private final byte[] family = Bytes.toBytes("cf"); + private final byte[] qualifier = Bytes.toBytes("c1"); + private final byte[] row = Bytes.toBytes("row"); + private final byte[] value = Bytes.toBytes("v1"); + private final byte[] newVal = Bytes.toBytes("v2"); + + @Rule + public TestName name = new TestName(); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class); + + @BeforeClass + public static void beforeAllTests() throws Exception { + int clientZkPort = 21828; + clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration()); + clientZkCluster.setDefaultClientPort(clientZkPort); + clientZkCluster.startup(clientZkDir); + // reduce the retry number and start log counter + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1); + TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1); + // core settings for testing client ZK cluster + TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort); + // reduce zk session timeout to easier trigger session expiration + TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT); + // Start a cluster with 2 masters and 3 regionservers. + TEST_UTIL.startMiniCluster(2, 3); + } + + @AfterClass + public static void afterAllTests() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + clientZkCluster.shutdown(); + FileUtils.deleteDirectory(clientZkDir); + } + + @Test(timeout = 60000) + public void testBasicOperation() throws Exception { + TableName tn = TableName.valueOf(name.getMethodName()); + // create table + Connection conn = TEST_UTIL.getConnection(); + Admin admin = conn.getAdmin(); + HTable table = (HTable) conn.getTable(tn); + try { + ColumnFamilyDescriptorBuilder cfDescBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(family); + TableDescriptorBuilder tableDescBuilder = + TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); + admin.createTable(tableDescBuilder.build()); + // test simple get and put + Put put = new Put(row); + put.addColumn(family, qualifier, value); + table.put(put); + Get get = new Get(row); + Result result = table.get(get); + LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); + Assert.assertArrayEquals(value, result.getValue(family, qualifier)); + } finally { + admin.close(); + table.close(); + } + } + + @Test(timeout = 60000) + public void testMasterSwitch() throws Exception { + // get an admin instance and issue some request first + Connection conn = TEST_UTIL.getConnection(); + Admin admin = conn.getAdmin(); + LOG.debug("Tables: " + admin.listTableDescriptors()); + try { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + // switch active master + HMaster master = cluster.getMaster(); + master.stopMaster(); + while (!master.isShutDown()) { + Thread.sleep(200); + } + while (!cluster.getMaster().isInitialized()) { + Thread.sleep(200); + } + // confirm client access still works + Assert.assertTrue(admin.balance(false)); + } finally { + admin.close(); + } + } + + @Test(timeout = 60000) + public void testMetaRegionMove() throws Exception { + TableName tn = TableName.valueOf(name.getMethodName()); + // create table + Connection conn = TEST_UTIL.getConnection(); + Admin admin = conn.getAdmin(); + HTable table = (HTable) conn.getTable(tn); + try { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + ColumnFamilyDescriptorBuilder cfDescBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(family); + TableDescriptorBuilder tableDescBuilder = + TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); + admin.createTable(tableDescBuilder.build()); + // issue some requests to cache the region location + Put put = new Put(row); + put.addColumn(family, qualifier, value); + table.put(put); + Get get = new Get(row); + Result result = table.get(get); + // move meta region and confirm client could detect + byte[] destServerName = null; + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + ServerName name = rst.getRegionServer().getServerName(); + if (!name.equals(cluster.getServerHoldingMeta())) { + destServerName = Bytes.toBytes(name.getServerName()); + break; + } + } + admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName); + LOG.debug("Finished moving meta"); + // invalidate client cache + RegionInfo region = + table.getRegionLocator().getRegionLocation(row).getRegion(); + ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName()); + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + ServerName name = rst.getRegionServer().getServerName(); + if (!name.equals(currentServer)) { + destServerName = Bytes.toBytes(name.getServerName()); + break; + } + } + admin.move(region.getEncodedNameAsBytes(), destServerName); + LOG.debug("Finished moving user region"); + put = new Put(row); + put.addColumn(family, qualifier, newVal); + table.put(put); + result = table.get(get); + LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); + Assert.assertArrayEquals(newVal, result.getValue(family, qualifier)); + } finally { + admin.close(); + table.close(); + } + } + + @Test(timeout = 120000) + public void testMetaMoveDuringClientZkClusterRestart() throws Exception { + TableName tn = TableName.valueOf(name.getMethodName()); + // create table + ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); + Admin admin = conn.getAdmin(); + HTable table = (HTable) conn.getTable(tn); + try { + ColumnFamilyDescriptorBuilder cfDescBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(family); + TableDescriptorBuilder tableDescBuilder = + TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); + admin.createTable(tableDescBuilder.build()); + // put some data + Put put = new Put(row); + put.addColumn(family, qualifier, value); + table.put(put); + // invalid connection cache + conn.clearRegionCache(); + // stop client zk cluster + clientZkCluster.shutdown(); + // stop current meta server and confirm the server shutdown process + // is not affected by client ZK crash + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + int metaServerId = cluster.getServerWithMeta(); + HRegionServer metaServer = cluster.getRegionServer(metaServerId); + metaServer.stop("Stop current RS holding meta region"); + while (!metaServer.isShutDown()) { + Thread.sleep(200); + } + // wait for meta region online + cluster.getMaster().getAssignmentManager() + .waitForAssignment(RegionInfoBuilder.FIRST_META_REGIONINFO); + // wait some long time to make sure we will retry sync data to client ZK until data set + Thread.sleep(10000); + clientZkCluster.startup(clientZkDir); + // new request should pass + Get get = new Get(row); + Result result = table.get(get); + LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); + Assert.assertArrayEquals(value, result.getValue(family, qualifier)); + } finally { + admin.close(); + table.close(); + } + } + + @Test(timeout = 60000) + public void testAsyncTable() throws Exception { + TableName tn = TableName.valueOf(name.getMethodName()); + ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); + TableDescriptorBuilder tableDescBuilder = + TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); + try (AsyncConnection ASYNC_CONN = + ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { + ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get(); + AsyncTable table = ASYNC_CONN.getTable(tn); + // put some data + Put put = new Put(row); + put.addColumn(family, qualifier, value); + table.put(put).get(); + // get and verify + Get get = new Get(row); + Result result = table.get(get).get(); + LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); + Assert.assertArrayEquals(value, result.getValue(family, qualifier)); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index 2b85e25abea..9ee10a7e7fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -53,6 +53,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; @@ -318,4 +319,40 @@ public class TestMasterNoCluster { master.join(); } } + + @Test(timeout = 60000) + public void testMasterInitWithSameClientServerZKQuorum() throws Exception { + Configuration conf = new Configuration(TESTUTIL.getConfiguration()); + conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + conf.setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, TESTUTIL.getZkCluster().getClientPort()); + HMaster master = new HMaster(conf); + master.start(); + // the master will abort due to IllegalArgumentException so we should finish within 60 seconds + master.join(); + } + + @Test(timeout = 60000) + public void testMasterInitWithObserverModeClientZKQuorum() throws Exception { + Configuration conf = new Configuration(TESTUTIL.getConfiguration()); + Assert.assertFalse(Boolean.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE)); + // set client ZK to some non-existing address and make sure server won't access client ZK + // (server start should not be affected) + conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + conf.setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, + TESTUTIL.getZkCluster().getClientPort() + 1); + // settings to allow us not to start additional RS + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); + conf.setBoolean(LoadBalancer.TABLES_ON_MASTER, true); + // main setting for this test case + conf.setBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE, true); + HMaster master = new HMaster(conf); + master.start(); + while (!master.isInitialized()) { + Threads.sleep(200); + } + Assert.assertNull(master.metaLocationSyncer); + Assert.assertNull(master.masterAddressSyncer); + master.stopMaster(); + master.join(); + } } diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java index 010ec8c541c..b2645639cce 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java @@ -238,7 +238,7 @@ public class MiniZooKeeperCluster { standaloneServerFactory.configure( new InetSocketAddress(currentClientPort), configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, - HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS)); + HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS)); } catch (BindException e) { LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java index dd71dee5b8c..0db6205c693 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java @@ -46,7 +46,7 @@ public final class ZKServerTool { for (String value : values) { String[] parts = value.split(":"); String host = parts[0]; - int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; + int port = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT; if (parts.length > 1) { port = Integer.parseInt(parts[1]); } diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 7bb47527597..dda53191064 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1932,7 +1932,7 @@ public final class ZKUtil { String host = sp[0]; int port = sp.length > 1 ? Integer.parseInt(sp[1]) - : HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; + : HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT; InetSocketAddress sockAddr = new InetSocketAddress(host, port); try (Socket socket = new Socket()) { diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index 3aac946947b..c3cac5f3d20 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -116,8 +116,43 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { public ZKWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { + this(conf, identifier, abortable, canCreateBaseZNode, false); + } + + /** + * Instantiate a ZooKeeper connection and watcher. + * @param conf the configuration to use + * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for + * this instance. Use null for default. + * @param abortable Can be null if there is on error there is no host to abort: e.g. client + * context. + * @param canCreateBaseZNode true if a base ZNode can be created + * @param clientZK whether this watcher is set to access client ZK + * @throws IOException if the connection to ZooKeeper fails + * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base + * ZNodes + */ + public ZKWatcher(Configuration conf, String identifier, Abortable abortable, + boolean canCreateBaseZNode, boolean clientZK) + throws IOException, ZooKeeperConnectionException { this.conf = conf; - this.quorum = ZKConfig.getZKQuorumServersString(conf); + if (clientZK) { + String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); + String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf); + if (clientZkQuorumServers != null) { + if (clientZkQuorumServers.equals(serverZkQuorumServers)) { + // Don't allow same settings to avoid dead loop when master trying + // to sync meta information from server ZK to client ZK + throw new IllegalArgumentException( + "The quorum settings for client ZK should be different from those for server"); + } + this.quorum = clientZkQuorumServers; + } else { + this.quorum = serverZkQuorumServers; + } + } else { + this.quorum = ZKConfig.getZKQuorumServersString(conf); + } this.prefix = identifier; // Identifier will get the sessionid appended later below down when we // handle the syncconnect event.