From edc0ef3fe4b818da29ed0e581139dd4faf1cd591 Mon Sep 17 00:00:00 2001 From: Jerry He Date: Sat, 17 Sep 2016 16:51:26 -0700 Subject: [PATCH] HBASE-16598 Enable zookeeper useMulti always and clean up in HBase code --- .../replication/ReplicationPeersZKImpl.java | 1 - .../replication/ReplicationQueuesZKImpl.java | 160 +---- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 83 ++- .../org/apache/hadoop/hbase/HConstants.java | 3 - .../src/main/resources/hbase-default.xml | 10 - .../protobuf/generated/ZooKeeperProtos.java | 559 +----------------- .../src/main/protobuf/ZooKeeper.proto | 7 - .../hadoop/hbase/rsgroup/TestRSGroups.java | 3 - .../apache/hadoop/hbase/master/HMaster.java | 15 +- .../ReplicationZKLockCleanerChore.java | 112 ---- .../TestMultiSlaveReplication.java | 38 -- .../TestReplicationSourceManager.java | 1 - .../TestReplicationSourceManagerZkImpl.java | 2 - .../hadoop/hbase/zookeeper/TestZKMulti.java | 47 -- .../client/rsgroup/TestShellRSGroups.java | 3 - .../asciidoc/_chapters/configuration.adoc | 5 +- src/main/asciidoc/_chapters/zookeeper.adoc | 4 +- 17 files changed, 50 insertions(+), 1003 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 90b13470d58..d4b93c0a679 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -131,7 +131,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re List listOfOps = new ArrayList(); ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), ReplicationSerDeHelper.toByteArray(peerConfig)); - // There is a race (if hbase.zookeeper.useMulti is false) // b/w PeerWatcher and ReplicationZookeeper#add method to create the // peer-state znode. This happens while adding a peer // The peer state data is set as "ENABLED" by default. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 1c579ab18f7..40c9140dbd7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.SortedSet; import java.util.TreeSet; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -31,8 +30,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -67,8 +64,6 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R /** Znode containing all replication queues for this region server. */ private String myQueuesZnode; - /** Name of znode we use to lock during failover */ - private final static String RS_LOCK_ZNODE = "lock"; private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); @@ -189,42 +184,13 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } catch (KeeperException e) { this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e); } - if (queues != null) { - queues.remove(RS_LOCK_ZNODE); - } return queues; } @Override public Pair> claimQueue(String regionserver, String queueId) { - if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { - LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); - return moveQueueUsingMulti(regionserver, queueId); - } else { - LOG.info("Moving " + regionserver + "/" + queueId + "'s wals to my queue"); - if (!lockOtherRS(regionserver)) { - LOG.info("Can not take the lock now"); - return null; - } - Pair> newQueues; - try { - newQueues = copyQueueFromLockedRS(regionserver, queueId); - removeQueueFromLockedRS(regionserver, queueId); - } finally { - unlockOtherRS(regionserver); - } - return newQueues; - } - } - - private void removeQueueFromLockedRS(String znode, String peerId) { - String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); - String peerPath = ZKUtil.joinZNode(nodePath, peerId); - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, peerPath); - } catch (KeeperException e) { - LOG.warn("Remove copied queue failed", e); - } + LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue"); + return moveQueueUsingMulti(regionserver, queueId); } @Override @@ -278,58 +244,6 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R return listOfQueues == null ? new ArrayList() : listOfQueues; } - /** - * Try to set a lock in another region server's znode. - * @param znode the server names of the other server - * @return true if the lock was acquired, false in every other cases - */ - @VisibleForTesting - public boolean lockOtherRS(String znode) { - try { - String parent = ZKUtil.joinZNode(this.queuesZNode, znode); - if (parent.equals(this.myQueuesZnode)) { - LOG.warn("Won't lock because this is us, we're dead!"); - return false; - } - String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); - ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode)); - } catch (KeeperException e) { - // This exception will pop up if the znode under which we're trying to - // create the lock is already deleted by another region server, meaning - // that the transfer already occurred. - // NoNode => transfer is done and znodes are already deleted - // NodeExists => lock znode already created by another RS - if (e instanceof KeeperException.NoNodeException - || e instanceof KeeperException.NodeExistsException) { - LOG.info("Won't transfer the queue," + " another RS took care of it because of: " - + e.getMessage()); - } else { - LOG.info("Failed lock other rs", e); - } - return false; - } - return true; - } - - public String getLockZNode(String znode) { - return this.queuesZNode + "/" + znode + "/" + RS_LOCK_ZNODE; - } - - @VisibleForTesting - public boolean checkLockExists(String znode) throws KeeperException { - return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0; - } - - private void unlockOtherRS(String znode){ - String parent = ZKUtil.joinZNode(this.queuesZNode, znode); - String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); - try { - ZKUtil.deleteNode(this.zookeeper, p); - } catch (KeeperException e) { - this.abortable.abort("Remove lock failed", e); - } - } - /** * It "atomically" copies all the wals queues from another region server and returns them all * sorted per peer cluster (appended with the dead server's znode). @@ -390,76 +304,6 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R return null; } - /** - * This methods moves all the wals queues from another region server and returns them all sorted - * per peer cluster (appended with the dead server's znode) - * @param znode server names to copy - * @return all wals for the peer of that cluster, null if an error occurred - */ - private Pair> copyQueueFromLockedRS(String znode, String peerId) { - // TODO this method isn't atomic enough, we could start copying and then - // TODO fail for some reason and we would end up with znodes we don't want. - try { - String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); - String clusterPath = ZKUtil.joinZNode(nodePath, peerId); - if (!peerExists(replicationQueueInfo.getPeerId())) { - LOG.warn("Peer " + peerId + " didn't exist, skipping the replay"); - // Protection against moving orphaned queues - return null; - } - // We add the name of the recovered RS to the new znode, we can even - // do that for queues that were recovered 10 times giving a znode like - // number-startcode-number-otherstartcode-number-anotherstartcode-etc - String newCluster = peerId + "-" + znode; - String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster); - - List wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); - // That region server didn't have anything to replicate for this cluster - if (wals == null || wals.size() == 0) { - return null; - } - ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, - HConstants.EMPTY_BYTE_ARRAY); - SortedSet logQueue = new TreeSet<>(); - for (String wal : wals) { - String z = ZKUtil.joinZNode(clusterPath, wal); - byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); - long position = 0; - try { - position = ZKUtil.parseWALPositionFrom(positionBytes); - } catch (DeserializationException e) { - LOG.warn("Failed parse of wal position from the following znode: " + z - + ", Exception: " + e); - } - LOG.debug("Creating " + wal + " with data " + position); - String child = ZKUtil.joinZNode(newClusterZnode, wal); - // Position doesn't actually change, we are just deserializing it for - // logging, so just use the already serialized version - ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, child, positionBytes); - logQueue.add(wal); - } - return new Pair<>(newCluster, logQueue); - } catch (KeeperException e) { - LOG.warn("Got exception in copyQueueFromLockedRS: ", e); - } catch (InterruptedException e) { - LOG.warn(e); - Thread.currentThread().interrupt(); - } - return null; - } - - /** - * @param lockOwner - * @return Serialized protobuf of lockOwner with pb magic prefix prepended suitable - * for use as content of an replication lock during region server fail over. - */ - static byte[] lockToByteArray(final String lockOwner) { - byte[] bytes = - ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); - } - @Override public void addHFileRefs(String peerId, List files) throws ReplicationException { String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index f6914ae0115..0896725acab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1285,9 +1285,6 @@ public class ZKUtil { * Sets no watches. Throws all exceptions besides dealing with deletion of * children. * - * If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update functionality. - * Otherwise, run the list of operations sequentially. - * * @throws KeeperException */ public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node) @@ -1304,13 +1301,9 @@ public class ZKUtil { * Sets no watches. Throws all exceptions besides dealing with deletion of * children. *

- * If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update - * functionality. Otherwise, run the list of operations sequentially. - *

- * If all of the following are true: + * If the following is true: *

    *
  • runSequentialOnMultiFailure is true - *
  • hbase.zookeeper.useMulti is true *
* on calling multi, we get a ZooKeeper exception that can be handled by a * sequential call(*), we retry the operations one-by-one (sequentially). @@ -1359,13 +1352,9 @@ public class ZKUtil { * Sets no watches. Throws all exceptions besides dealing with deletion of * children. *

- * If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update - * functionality. Otherwise, run the list of operations sequentially. - *

- * If all of the following are true: + * If the following is true: *

    *
  • runSequentialOnMultiFailure is true - *
  • hbase.zookeeper.useMulti is true *
* on calling multi, we get a ZooKeeper exception that can be handled by a * sequential call(*), we retry the operations one-by-one (sequentially). @@ -1636,12 +1625,10 @@ public class ZKUtil { } /** - * If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update functionality. - * Otherwise, run the list of operations sequentially. + * Use ZooKeeper's multi-update functionality. * * If all of the following are true: * - runSequentialOnMultiFailure is true - * - hbase.zookeeper.useMulti is true * - on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*) * Then: * - we retry the operations one-by-one (sequentially) @@ -1658,42 +1645,38 @@ public class ZKUtil { */ public static void multiOrSequential(ZooKeeperWatcher zkw, List ops, boolean runSequentialOnMultiFailure) throws KeeperException { - if (ops == null) return; - boolean useMulti = zkw.getConfiguration().getBoolean(HConstants.ZOOKEEPER_USEMULTI, false); - - if (useMulti) { - List zkOps = new LinkedList(); - for (ZKUtilOp op : ops) { - zkOps.add(toZooKeeperOp(zkw, op)); - } - try { - zkw.getRecoverableZooKeeper().multi(zkOps); - } catch (KeeperException ke) { - switch (ke.code()) { - case NODEEXISTS: - case NONODE: - case BADVERSION: - case NOAUTH: - // if we get an exception that could be solved by running sequentially - // (and the client asked us to), then break out and run sequentially - if (runSequentialOnMultiFailure) { - LOG.info("On call to ZK.multi, received exception: " + ke.toString() + "." - + " Attempting to run operations sequentially because" - + " runSequentialOnMultiFailure is: " + runSequentialOnMultiFailure + "."); - processSequentially(zkw, ops); - break; - } - default: - throw ke; - } - } catch (InterruptedException ie) { - zkw.interruptedException(ie); - } - } else { - // run sequentially - processSequentially(zkw, ops); + if (zkw.getConfiguration().get("hbase.zookeeper.useMulti") != null) { + LOG.warn("hbase.zookeeper.useMulti is deprecated. Default to true always."); } + if (ops == null) return; + List zkOps = new LinkedList(); + for (ZKUtilOp op : ops) { + zkOps.add(toZooKeeperOp(zkw, op)); + } + try { + zkw.getRecoverableZooKeeper().multi(zkOps); + } catch (KeeperException ke) { + switch (ke.code()) { + case NODEEXISTS: + case NONODE: + case BADVERSION: + case NOAUTH: + // if we get an exception that could be solved by running sequentially + // (and the client asked us to), then break out and run sequentially + if (runSequentialOnMultiFailure) { + LOG.info("On call to ZK.multi, received exception: " + ke.toString() + "." + + " Attempting to run operations sequentially because" + + " runSequentialOnMultiFailure is: " + runSequentialOnMultiFailure + "."); + processSequentially(zkw, ops); + break; + } + default: + throw ke; + } + } catch (InterruptedException ie) { + zkw.interruptedException(ie); + } } private static void processSequentially(ZooKeeperWatcher zkw, List ops) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 5c53030adc7..4a8f55c07cf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -229,9 +229,6 @@ public final class HConstants { /** Default value for ZooKeeper session timeout */ public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000; - /** Configuration key for whether to use ZK.multi */ - public static final String ZOOKEEPER_USEMULTI = "hbase.zookeeper.useMulti"; - /** Parameter name for port region server listens on. */ public static final String REGIONSERVER_PORT = "hbase.regionserver.port"; diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 83158296eda..5b0700b08fa 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -398,16 +398,6 @@ possible configurations would overwhelm and obscure the important. for more information. - - hbase.zookeeper.useMulti - true - Instructs HBase to make use of ZooKeeper's multi-update functionality. - This allows certain ZooKeeper operations to complete more quickly and prevents some issues - with rare Replication failure scenarios (see the release note of HBASE-2611 for an example). - IMPORTANT: only set this to true if all ZooKeeper servers in the cluster are on version 3.4+ - and will not be downgraded. ZooKeeper versions before 3.4 do not support multi-update and - will not fail gracefully if multi-update is invoked (see ZOOKEEPER-1495). -