From 842f794a627dcbfc72a78f1742645271d4d4bca6 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 12 Jan 2018 09:43:56 +0800 Subject: [PATCH] HBASE-19772 Do not close connection to zk when there are still pending request in ReadOnlyZKClient --- .../hbase/zookeeper/ReadOnlyZKClient.java | 79 +++++++++---------- .../hbase/zookeeper/TestReadOnlyZKClient.java | 71 +++++++++++++---- 2 files changed, 92 insertions(+), 58 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 82c011b1cdf..ad707408618 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -29,9 +29,8 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -39,6 +38,7 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -114,7 +114,10 @@ public final class ReadOnlyZKClient implements Closeable { private final AtomicBoolean closed = new AtomicBoolean(false); - private ZooKeeper zookeeper; + @VisibleForTesting + ZooKeeper zookeeper; + + private int pendingRequests = 0; private String getId() { return String.format("0x%08x", System.identityHashCode(this)); @@ -127,12 +130,12 @@ public final class ReadOnlyZKClient implements Closeable { this.retryIntervalMs = conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); - LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString + - ", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries + - ", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms"); - Thread t = new Thread(this::run, "ReadOnlyZKClient"); - t.setDaemon(true); - t.start(); + LOG.info( + "Start read only zookeeper connection {} to {}, " + "session timeout {} ms, retries {}, " + + "retry interval {} ms, keep alive {} ms", + getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs); + Threads.setDaemonThreadRunning(new Thread(this::run), + "ReadOnlyZKClient-" + connectString + "@" + getId()); } private abstract class ZKTask extends Task { @@ -156,6 +159,7 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void exec(ZooKeeper alwaysNull) { + pendingRequests--; Code code = Code.get(rc); if (code == Code.OK) { future.complete(ret); @@ -169,19 +173,19 @@ public final class ReadOnlyZKClient implements Closeable { future.completeExceptionally(KeeperException.create(code, path)); } else { if (code == Code.SESSIONEXPIRED) { - LOG.warn(getId() + " session expired, close and reconnect"); + LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); try { zk.close(); } catch (InterruptedException e) { } } if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { - LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + - code + ", retries = " + ZKTask.this.retries); + LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), + connectString, operationType, path, code, ZKTask.this.retries); tasks.add(ZKTask.this); } else { - LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + - code + ", retries = " + ZKTask.this.retries + ", give up"); + LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), + connectString, operationType, path, code, ZKTask.this.retries); future.completeExceptionally(KeeperException.create(code, path)); } } @@ -205,6 +209,14 @@ public final class ReadOnlyZKClient implements Closeable { return true; } + protected abstract void doExec(ZooKeeper zk); + + @Override + public final void exec(ZooKeeper zk) { + pendingRequests++; + doExec(zk); + } + public boolean delay(long intervalMs, int maxRetries) { if (retries >= maxRetries) { return false; @@ -217,14 +229,12 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void connectFailed(IOException e) { if (delay(retryIntervalMs, maxRetries)) { - LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + - ", retries = " + retries, - e); + LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), + connectString, operationType, path, retries, e); tasks.add(this); } else { - LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + - ", retries = " + retries + ", give up", - e); + LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), + connectString, operationType, path, retries, e); future.completeExceptionally(e); } } @@ -249,7 +259,7 @@ public final class ReadOnlyZKClient implements Closeable { tasks.add(new ZKTask(path, future, "get") { @Override - public void exec(ZooKeeper zk) { + protected void doExec(ZooKeeper zk) { zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null); } @@ -265,7 +275,7 @@ public final class ReadOnlyZKClient implements Closeable { tasks.add(new ZKTask(path, future, "exists") { @Override - public void exec(ZooKeeper zk) { + protected void doExec(ZooKeeper zk) { zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); } }); @@ -286,16 +296,6 @@ public final class ReadOnlyZKClient implements Closeable { // may be closed when session expired if (zookeeper == null || !zookeeper.getState().isAlive()) { zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {}); - int timeout = 10000; - try { - // Before returning, try and ensure we are connected. Don't wait long in case - // we are trying to connect to a cluster that is down. If we fail to connect, - // just catch the exception and carry-on. The first usage will fail and we'll - // cleanup. - zookeeper = ZooKeeperHelper.ensureConnectedZooKeeper(zookeeper, timeout); - } catch (ZooKeeperConnectionException e) { - LOG.warn("Failed connecting after waiting " + timeout + "ms; " + zookeeper); - } } return zookeeper; } @@ -311,9 +311,11 @@ public final class ReadOnlyZKClient implements Closeable { if (task == CLOSE) { break; } - if (task == null) { - LOG.info(getId() + " no activities for " + keepAliveTimeMs + - " ms, close active connection. Will reconnect next time when there are new requests."); + if (task == null && pendingRequests == 0) { + LOG.debug( + "{} to {} no activities for {} ms, close active connection. " + + "Will reconnect next time when there are new requests", + getId(), connectString, keepAliveTimeMs); closeZk(); continue; } @@ -339,16 +341,11 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void close() { if (closed.compareAndSet(false, true)) { - LOG.info("Close zookeeper connection " + getId() + " to " + connectString); + LOG.info("Close zookeeper connection {} to {}", getId(), connectString); tasks.add(CLOSE); } } - @VisibleForTesting - ZooKeeper getZooKeeper() { - return zookeeper; - } - @VisibleForTesting public String getConnectString() { return connectString; diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index c47812156fb..211e7769d79 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,26 +27,36 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @Category({ ZKTests.class, MediumTests.class }) public class TestReadOnlyZKClient { @@ -67,8 +77,7 @@ public class TestReadOnlyZKClient { public static void setUp() throws Exception { PORT = UTIL.startMiniZKCluster().getClientPort(); - ZooKeeper zk = ZooKeeperHelper. - getConnectedZooKeeper("localhost:" + PORT, 10000); + ZooKeeper zk = ZooKeeperHelper.getConnectedZooKeeper("localhost:" + PORT, 10000); DATA = new byte[10]; ThreadLocalRandom.current().nextBytes(DATA); zk.create(PATH, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -83,7 +92,7 @@ public class TestReadOnlyZKClient { conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000); RO_ZK = new ReadOnlyZKClient(conf); // only connect when necessary - assertNull(RO_ZK.getZooKeeper()); + assertNull(RO_ZK.zookeeper); } @AfterClass @@ -93,17 +102,13 @@ public class TestReadOnlyZKClient { UTIL.cleanupTestDir(); } - @Test - public void testGetAndExists() throws Exception { - assertArrayEquals(DATA, RO_ZK.get(PATH).get()); - assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren()); - assertNotNull(RO_ZK.getZooKeeper()); + private void waitForIdleConnectionClosed() throws Exception { // The zookeeper client should be closed finally after the keep alive time elapsed UTIL.waitFor(10000, new ExplainingPredicate() { @Override public boolean evaluate() throws Exception { - return RO_ZK.getZooKeeper() == null; + return RO_ZK.zookeeper == null; } @Override @@ -113,6 +118,14 @@ public class TestReadOnlyZKClient { }); } + @Test + public void testGetAndExists() throws Exception { + assertArrayEquals(DATA, RO_ZK.get(PATH).get()); + assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren()); + assertNotNull(RO_ZK.zookeeper); + waitForIdleConnectionClosed(); + } + @Test public void testNoNode() throws InterruptedException, ExecutionException { String pathNotExists = PATH + "_whatever"; @@ -132,15 +145,39 @@ public class TestReadOnlyZKClient { @Test public void testSessionExpire() throws Exception { assertArrayEquals(DATA, RO_ZK.get(PATH).get()); - ZooKeeper zk = RO_ZK.getZooKeeper(); + ZooKeeper zk = RO_ZK.zookeeper; long sessionId = zk.getSessionId(); UTIL.getZkCluster().getZooKeeperServers().get(0).closeSession(sessionId); // should not reach keep alive so still the same instance - assertSame(zk, RO_ZK.getZooKeeper()); - byte [] got = RO_ZK.get(PATH).get(); + assertSame(zk, RO_ZK.zookeeper); + byte[] got = RO_ZK.get(PATH).get(); assertArrayEquals(DATA, got); - assertNotNull(RO_ZK.getZooKeeper()); - assertNotSame(zk, RO_ZK.getZooKeeper()); - assertNotEquals(sessionId, RO_ZK.getZooKeeper().getSessionId()); + assertNotNull(RO_ZK.zookeeper); + assertNotSame(zk, RO_ZK.zookeeper); + assertNotEquals(sessionId, RO_ZK.zookeeper.getSessionId()); + } + + @Test + public void testNotCloseZkWhenPending() throws Exception { + assertArrayEquals(DATA, RO_ZK.get(PATH).get()); + ZooKeeper mockedZK = spy(RO_ZK.zookeeper); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(new Answer() { + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + latch.await(); + return invocation.callRealMethod(); + } + }).when(mockedZK).exists(anyString(), anyBoolean(), any(StatCallback.class), any()); + RO_ZK.zookeeper = mockedZK; + CompletableFuture future = RO_ZK.exists(PATH); + // 2 * keep alive time to ensure that we will not close the zk when there are pending requests + Thread.sleep(6000); + assertNotNull(RO_ZK.zookeeper); + latch.countDown(); + assertEquals(CHILDREN, future.get().getNumChildren()); + // now we will close the idle connection. + waitForIdleConnectionClosed(); } }