From 32690e1e89653c582e9f80a83ec07ebe7318fe79 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 25 Jun 2020 18:36:14 -0700 Subject: [PATCH] HBASE-24603: Make Zookeeper sync() call synchronous (#1945) (#1976) Writing a test for this is tricky. There is enough coverage for functional tests. Only concern is performance, but there is enough logging for it to detect timed out/badly performing sync calls. Additionally, this patch decouples the ZK event processing into it's own thread rather than doing it in the EventThread's context. That avoids deadlocks and stalls of the event thread. Signed-off-by: Andrew Purtell Signed-off-by: Viraj Jasani (cherry picked from commit 84e246f9b197bfa4307172db5465214771b78d38) (cherry picked from commit 2379a25f0c4f2bdd3ea91fa5e0ba63f034c8d21c) --- .../hbase/zookeeper/RecoverableZooKeeper.java | 2 +- .../hadoop/hbase/zookeeper/ZKAssign.java | 10 +- .../hbase/zookeeper/ZooKeeperWatcher.java | 141 ++++++++++++------ .../org/apache/hadoop/hbase/HConstants.java | 9 ++ .../backup/example/HFileArchiveManager.java | 4 +- .../visibility/ZKVisibilityLabelWatcher.java | 2 +- .../apache/hadoop/hbase/master/Mocking.java | 2 +- .../TestSplitTransactionOnCluster.java | 4 +- 8 files changed, 119 insertions(+), 55 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 6c29fcce3ab..c5ecae94525 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -875,7 +875,7 @@ public class RecoverableZooKeeper { } public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { - checkZk().sync(path, cb, null); + checkZk().sync(path, cb, ctx); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java index b2e1e1eeb15..59d2f1308d4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java @@ -208,7 +208,7 @@ public class ZKAssign { region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY); byte [] data = rt.toByteArray(); String node = getNodeName(zkw, region.getEncodedName()); - zkw.sync(node); + zkw.syncOrTimeout(node); int version = ZKUtil.checkExists(zkw, node); if (version == -1) { return ZKUtil.createAndWatch(zkw, node, data); @@ -444,7 +444,7 @@ public class ZKAssign { "node " + encodedRegionName + " in expected state " + expectedState)); } String node = getNodeName(zkw, encodedRegionName); - zkw.sync(node); + zkw.syncOrTimeout(node); Stat stat = new Stat(); byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat); if (bytes == null) { @@ -645,7 +645,7 @@ public class ZKAssign { } String node = getNodeName(zkw, encoded); - zkw.sync(node); + zkw.syncOrTimeout(node); // Read existing data of the node Stat stat = new Stat(); @@ -727,7 +727,7 @@ public class ZKAssign { int expectedVersion) throws KeeperException { final String encoded = getNodeName(zkw, region.getEncodedName()); - zkw.sync(encoded); + zkw.syncOrTimeout(encoded); // Read existing data of the node Stat stat = new Stat(); @@ -807,7 +807,7 @@ public class ZKAssign { } String node = getNodeName(zkw, encoded); - zkw.sync(node); + zkw.syncOrTimeout(node); // Read existing data of the node Stat stat = new Stat(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 0b4e84872d7..6e80432bf06 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -26,6 +26,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,9 +41,11 @@ import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -86,6 +91,18 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private final List listeners = new CopyOnWriteArrayList(); + // Single threaded executor pool that processes event notifications from Zookeeper. Events are + // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do + // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context. + // EventThread internally runs a single while loop to serially process all the events. When events + // are processed by the listeners in the same thread, that blocks the EventThread from processing + // subsequent events. Processing events in a separate thread frees up the event thread to continue + // and further prevents deadlocks if the process method itself makes other zookeeper calls. + // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the + // requests using a single while loop and hence there is no performance degradation. + private final ExecutorService zkEventProcessor = + Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("zk-event-processor")); + // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL // negotiation to complete public CountDownLatch saslLatch = new CountDownLatch(1); @@ -143,6 +160,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private final Configuration conf; + private final long zkSyncTimeout; + /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); @@ -196,6 +215,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { throw zce; } } + this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS, + HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS); } private void createBaseZNodes() throws ZooKeeperConnectionException { @@ -609,6 +630,46 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { return baseZNode; } + private void processEvent(WatchedEvent event) { + switch(event.getType()) { + // If event type is NONE, this is a connection status change + case None: { + connectionEvent(event); + break; + } + // Otherwise pass along to the listeners + case NodeCreated: { + for(ZooKeeperListener listener : listeners) { + listener.nodeCreated(event.getPath()); + } + break; + } + case NodeDeleted: { + for(ZooKeeperListener listener : listeners) { + listener.nodeDeleted(event.getPath()); + } + break; + } + case NodeDataChanged: { + for(ZooKeeperListener listener : listeners) { + listener.nodeDataChanged(event.getPath()); + } + break; + } + case NodeChildrenChanged: { + for(ZooKeeperListener listener : listeners) { + listener.nodeChildrenChanged(event.getPath()); + } + break; + } + default: { + LOG.error(String.format("Invalid event of type %s received for path %s. Ignoring", + event.getType(), event.getPath())); + break; + } + } + } + /** * Method called from ZooKeeper for events and connection status. *

@@ -616,50 +677,17 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { * are dealt with locally. */ @Override - public void process(WatchedEvent event) { + public void process(final WatchedEvent event) { LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" + event.getState() + ", " + "path=" + event.getPath())); - - switch(event.getType()) { - - // If event type is NONE, this is a connection status change - case None: { - connectionEvent(event); - break; + zkEventProcessor.submit(new Runnable() { + @Override + public void run() { + processEvent(event); } - - // Otherwise pass along to the listeners - - case NodeCreated: { - for(ZooKeeperListener listener : listeners) { - listener.nodeCreated(event.getPath()); - } - break; - } - - case NodeDeleted: { - for(ZooKeeperListener listener : listeners) { - listener.nodeDeleted(event.getPath()); - } - break; - } - - case NodeDataChanged: { - for(ZooKeeperListener listener : listeners) { - listener.nodeDataChanged(event.getPath()); - } - break; - } - - case NodeChildrenChanged: { - for(ZooKeeperListener listener : listeners) { - listener.nodeChildrenChanged(event.getPath()); - } - break; - } - } + }); } // Connection management @@ -709,7 +737,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { } /** - * Forces a synchronization of this ZooKeeper client connection. + * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a + * timeout lets the callers fail-fast rather than wait forever for the sync to finish. *

* Executing this method before running other methods will ensure that the * subsequent operations are up-to-date and consistent as of the time that @@ -720,8 +749,33 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { * previously read version and data. We want to ensure that the version read * is up-to-date from when we begin the operation. */ - public void sync(String path) throws KeeperException { - this.recoverableZooKeeper.sync(path, null, null); + public void syncOrTimeout(String path) throws KeeperException { + final CountDownLatch latch = new CountDownLatch(1); + long startTime = EnvironmentEdgeManager.currentTime(); + this.recoverableZooKeeper.sync(path, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int i, String s, Object o) { + latch.countDown(); + } + }, null); + try { + if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) { + LOG.warn(String.format("sync() operation to ZK timed out. Configured timeout: %s ms. " + + "This usually points to a ZK side issue. Check ZK server logs and metrics.", + zkSyncTimeout)); + throw new KeeperException.OperationTimeoutException(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted waiting for ZK sync() to finish.", e); + Thread.currentThread().interrupt(); + return; + } + if (LOG.isDebugEnabled()) { + // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a + // useful metric to have since the latency of sync() impacts the callers. + LOG.debug(String.format("ZK sync() operation took %d ms", + EnvironmentEdgeManager.currentTime() - startTime)); + } } /** @@ -770,6 +824,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { */ @Override public void close() { + zkEventProcessor.shutdownNow(); try { recoverableZooKeeper.close(); } catch (InterruptedException e) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 4607de986a0..9c5dd1d4787 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -244,6 +244,15 @@ public final class HConstants { /** Configuration key for ZooKeeper session timeout */ public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout"; + /** Timeout for the ZK sync() call */ + public static final String ZK_SYNC_BLOCKING_TIMEOUT_MS = "hbase.zookeeper.sync.timeout.millis"; + // Choice of the default value is based on the following ZK recommendation (from docs). Keeping it + // lower lets the callers fail fast in case of any issues. + // "The clients view of the system is guaranteed to be up-to-date within a certain time bound. + // (On the order of tens of seconds.) Either system changes will be seen by a client within this + // bound, or the client will detect a service outage." + public static final long ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS = 30 * 1000; + /** Default value for ZooKeeper session timeout */ public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java index 85b1135a0ff..80f4dc6bee1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java @@ -123,7 +123,7 @@ class HFileArchiveManager { */ private void disable(ZooKeeperWatcher zooKeeper, byte[] table) throws KeeperException { // ensure the latest state of the archive node is found - zooKeeper.sync(archiveZnode); + zooKeeper.syncOrTimeout(archiveZnode); // if the top-level archive node is gone, then we are done if (ZKUtil.checkExists(zooKeeper, archiveZnode) < 0) { @@ -132,7 +132,7 @@ class HFileArchiveManager { // delete the table node, from the archive String tableNode = this.getTableNode(table); // make sure the table is the latest version so the delete takes - zooKeeper.sync(tableNode); + zooKeeper.syncOrTimeout(tableNode); LOG.debug("Attempting to delete table node:" + tableNode); ZKUtil.deleteNodeRecursively(zooKeeper, tableNode); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java index 4941a54aec9..cb71b88b811 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java @@ -109,7 +109,7 @@ public class ZKVisibilityLabelWatcher extends ZooKeeperListener { public void nodeDataChanged(String path) { if (path.equals(labelZnode) || path.equals(userAuthsZnode)) { try { - watcher.sync(path); + watcher.syncOrTimeout(path); byte[] data = ZKUtil.getDataAndWatch(watcher, path); if (path.equals(labelZnode)) { refreshVisibilityLabelsCache(data); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/Mocking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/Mocking.java index fefcaf53f6c..0b0a3849db3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/Mocking.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/Mocking.java @@ -71,7 +71,7 @@ public class Mocking { String encoded = region.getEncodedName(); String node = ZKAssign.getNodeName(zkw, encoded); - zkw.sync(node); + zkw.syncOrTimeout(node); // Read existing data of the node byte [] existingBytes = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 15c51eb533d..2c664933d75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -1087,7 +1087,7 @@ public class TestSplitTransactionOnCluster { assertTrue("not able to find a splittable region", region != null); String node = ZKAssign.getNodeName(regionServer.getZooKeeper(), region.getRegionInfo().getEncodedName()); - regionServer.getZooKeeper().sync(node); + regionServer.getZooKeeper().syncOrTimeout(node); SplitTransactionImpl st = new SplitTransactionImpl(region, Bytes.toBytes("row2")); try { st.prepare(); @@ -1318,7 +1318,7 @@ public class TestSplitTransactionOnCluster { }; String node = ZKAssign.getNodeName(regionServer.getZooKeeper(), region.getRegionInfo().getEncodedName()); - regionServer.getZooKeeper().sync(node); + regionServer.getZooKeeper().syncOrTimeout(node); for (int i = 0; i < 100; i++) { // We expect the znode to be deleted by this time. Here the // znode could be in OPENED state and the