HBASE-19772 Do not close connection to zk when there are still pending request in ReadOnlyZKClient

This commit is contained in:
zhangduo 2018-01-12 09:43:56 +08:00
parent f91589d305
commit 842f794a62
2 changed files with 92 additions and 58 deletions

View File

@ -29,9 +29,8 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.Code;
@ -39,6 +38,7 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
@ -114,7 +114,10 @@ public final class ReadOnlyZKClient implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private ZooKeeper zookeeper; @VisibleForTesting
ZooKeeper zookeeper;
private int pendingRequests = 0;
private String getId() { private String getId() {
return String.format("0x%08x", System.identityHashCode(this)); return String.format("0x%08x", System.identityHashCode(this));
@ -127,12 +130,12 @@ public final class ReadOnlyZKClient implements Closeable {
this.retryIntervalMs = this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString + LOG.info(
", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries + "Start read only zookeeper connection {} to {}, " + "session timeout {} ms, retries {}, " +
", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms"); "retry interval {} ms, keep alive {} ms",
Thread t = new Thread(this::run, "ReadOnlyZKClient"); getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
t.setDaemon(true); Threads.setDaemonThreadRunning(new Thread(this::run),
t.start(); "ReadOnlyZKClient-" + connectString + "@" + getId());
} }
private abstract class ZKTask<T> extends Task { private abstract class ZKTask<T> extends Task {
@ -156,6 +159,7 @@ public final class ReadOnlyZKClient implements Closeable {
@Override @Override
public void exec(ZooKeeper alwaysNull) { public void exec(ZooKeeper alwaysNull) {
pendingRequests--;
Code code = Code.get(rc); Code code = Code.get(rc);
if (code == Code.OK) { if (code == Code.OK) {
future.complete(ret); future.complete(ret);
@ -169,19 +173,19 @@ public final class ReadOnlyZKClient implements Closeable {
future.completeExceptionally(KeeperException.create(code, path)); future.completeExceptionally(KeeperException.create(code, path));
} else { } else {
if (code == Code.SESSIONEXPIRED) { if (code == Code.SESSIONEXPIRED) {
LOG.warn(getId() + " session expired, close and reconnect"); LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
try { try {
zk.close(); zk.close();
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
} }
if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
code + ", retries = " + ZKTask.this.retries); connectString, operationType, path, code, ZKTask.this.retries);
tasks.add(ZKTask.this); tasks.add(ZKTask.this);
} else { } else {
LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
code + ", retries = " + ZKTask.this.retries + ", give up"); connectString, operationType, path, code, ZKTask.this.retries);
future.completeExceptionally(KeeperException.create(code, path)); future.completeExceptionally(KeeperException.create(code, path));
} }
} }
@ -205,6 +209,14 @@ public final class ReadOnlyZKClient implements Closeable {
return true; return true;
} }
protected abstract void doExec(ZooKeeper zk);
@Override
public final void exec(ZooKeeper zk) {
pendingRequests++;
doExec(zk);
}
public boolean delay(long intervalMs, int maxRetries) { public boolean delay(long intervalMs, int maxRetries) {
if (retries >= maxRetries) { if (retries >= maxRetries) {
return false; return false;
@ -217,14 +229,12 @@ public final class ReadOnlyZKClient implements Closeable {
@Override @Override
public void connectFailed(IOException e) { public void connectFailed(IOException e) {
if (delay(retryIntervalMs, maxRetries)) { if (delay(retryIntervalMs, maxRetries)) {
LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
", retries = " + retries, connectString, operationType, path, retries, e);
e);
tasks.add(this); tasks.add(this);
} else { } else {
LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
", retries = " + retries + ", give up", connectString, operationType, path, retries, e);
e);
future.completeExceptionally(e); future.completeExceptionally(e);
} }
} }
@ -249,7 +259,7 @@ public final class ReadOnlyZKClient implements Closeable {
tasks.add(new ZKTask<byte[]>(path, future, "get") { tasks.add(new ZKTask<byte[]>(path, future, "get") {
@Override @Override
public void exec(ZooKeeper zk) { protected void doExec(ZooKeeper zk) {
zk.getData(path, false, zk.getData(path, false,
(rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null); (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
} }
@ -265,7 +275,7 @@ public final class ReadOnlyZKClient implements Closeable {
tasks.add(new ZKTask<Stat>(path, future, "exists") { tasks.add(new ZKTask<Stat>(path, future, "exists") {
@Override @Override
public void exec(ZooKeeper zk) { protected void doExec(ZooKeeper zk) {
zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
} }
}); });
@ -286,16 +296,6 @@ public final class ReadOnlyZKClient implements Closeable {
// may be closed when session expired // may be closed when session expired
if (zookeeper == null || !zookeeper.getState().isAlive()) { if (zookeeper == null || !zookeeper.getState().isAlive()) {
zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {}); zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {});
int timeout = 10000;
try {
// Before returning, try and ensure we are connected. Don't wait long in case
// we are trying to connect to a cluster that is down. If we fail to connect,
// just catch the exception and carry-on. The first usage will fail and we'll
// cleanup.
zookeeper = ZooKeeperHelper.ensureConnectedZooKeeper(zookeeper, timeout);
} catch (ZooKeeperConnectionException e) {
LOG.warn("Failed connecting after waiting " + timeout + "ms; " + zookeeper);
}
} }
return zookeeper; return zookeeper;
} }
@ -311,9 +311,11 @@ public final class ReadOnlyZKClient implements Closeable {
if (task == CLOSE) { if (task == CLOSE) {
break; break;
} }
if (task == null) { if (task == null && pendingRequests == 0) {
LOG.info(getId() + " no activities for " + keepAliveTimeMs + LOG.debug(
" ms, close active connection. Will reconnect next time when there are new requests."); "{} to {} no activities for {} ms, close active connection. " +
"Will reconnect next time when there are new requests",
getId(), connectString, keepAliveTimeMs);
closeZk(); closeZk();
continue; continue;
} }
@ -339,16 +341,11 @@ public final class ReadOnlyZKClient implements Closeable {
@Override @Override
public void close() { public void close() {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
LOG.info("Close zookeeper connection " + getId() + " to " + connectString); LOG.info("Close zookeeper connection {} to {}", getId(), connectString);
tasks.add(CLOSE); tasks.add(CLOSE);
} }
} }
@VisibleForTesting
ZooKeeper getZooKeeper() {
return zookeeper;
}
@VisibleForTesting @VisibleForTesting
public String getConnectString() { public String getConnectString() {
return connectString; return connectString;

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -27,26 +27,36 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category({ ZKTests.class, MediumTests.class }) @Category({ ZKTests.class, MediumTests.class })
public class TestReadOnlyZKClient { public class TestReadOnlyZKClient {
@ -67,8 +77,7 @@ public class TestReadOnlyZKClient {
public static void setUp() throws Exception { public static void setUp() throws Exception {
PORT = UTIL.startMiniZKCluster().getClientPort(); PORT = UTIL.startMiniZKCluster().getClientPort();
ZooKeeper zk = ZooKeeperHelper. ZooKeeper zk = ZooKeeperHelper.getConnectedZooKeeper("localhost:" + PORT, 10000);
getConnectedZooKeeper("localhost:" + PORT, 10000);
DATA = new byte[10]; DATA = new byte[10];
ThreadLocalRandom.current().nextBytes(DATA); ThreadLocalRandom.current().nextBytes(DATA);
zk.create(PATH, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create(PATH, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@ -83,7 +92,7 @@ public class TestReadOnlyZKClient {
conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000); conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000);
RO_ZK = new ReadOnlyZKClient(conf); RO_ZK = new ReadOnlyZKClient(conf);
// only connect when necessary // only connect when necessary
assertNull(RO_ZK.getZooKeeper()); assertNull(RO_ZK.zookeeper);
} }
@AfterClass @AfterClass
@ -93,17 +102,13 @@ public class TestReadOnlyZKClient {
UTIL.cleanupTestDir(); UTIL.cleanupTestDir();
} }
@Test private void waitForIdleConnectionClosed() throws Exception {
public void testGetAndExists() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
assertNotNull(RO_ZK.getZooKeeper());
// The zookeeper client should be closed finally after the keep alive time elapsed // The zookeeper client should be closed finally after the keep alive time elapsed
UTIL.waitFor(10000, new ExplainingPredicate<Exception>() { UTIL.waitFor(10000, new ExplainingPredicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
return RO_ZK.getZooKeeper() == null; return RO_ZK.zookeeper == null;
} }
@Override @Override
@ -113,6 +118,14 @@ public class TestReadOnlyZKClient {
}); });
} }
@Test
public void testGetAndExists() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
assertNotNull(RO_ZK.zookeeper);
waitForIdleConnectionClosed();
}
@Test @Test
public void testNoNode() throws InterruptedException, ExecutionException { public void testNoNode() throws InterruptedException, ExecutionException {
String pathNotExists = PATH + "_whatever"; String pathNotExists = PATH + "_whatever";
@ -132,15 +145,39 @@ public class TestReadOnlyZKClient {
@Test @Test
public void testSessionExpire() throws Exception { public void testSessionExpire() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH).get()); assertArrayEquals(DATA, RO_ZK.get(PATH).get());
ZooKeeper zk = RO_ZK.getZooKeeper(); ZooKeeper zk = RO_ZK.zookeeper;
long sessionId = zk.getSessionId(); long sessionId = zk.getSessionId();
UTIL.getZkCluster().getZooKeeperServers().get(0).closeSession(sessionId); UTIL.getZkCluster().getZooKeeperServers().get(0).closeSession(sessionId);
// should not reach keep alive so still the same instance // should not reach keep alive so still the same instance
assertSame(zk, RO_ZK.getZooKeeper()); assertSame(zk, RO_ZK.zookeeper);
byte [] got = RO_ZK.get(PATH).get(); byte[] got = RO_ZK.get(PATH).get();
assertArrayEquals(DATA, got); assertArrayEquals(DATA, got);
assertNotNull(RO_ZK.getZooKeeper()); assertNotNull(RO_ZK.zookeeper);
assertNotSame(zk, RO_ZK.getZooKeeper()); assertNotSame(zk, RO_ZK.zookeeper);
assertNotEquals(sessionId, RO_ZK.getZooKeeper().getSessionId()); assertNotEquals(sessionId, RO_ZK.zookeeper.getSessionId());
}
@Test
public void testNotCloseZkWhenPending() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
ZooKeeper mockedZK = spy(RO_ZK.zookeeper);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
latch.await();
return invocation.callRealMethod();
}
}).when(mockedZK).exists(anyString(), anyBoolean(), any(StatCallback.class), any());
RO_ZK.zookeeper = mockedZK;
CompletableFuture<Stat> future = RO_ZK.exists(PATH);
// 2 * keep alive time to ensure that we will not close the zk when there are pending requests
Thread.sleep(6000);
assertNotNull(RO_ZK.zookeeper);
latch.countDown();
assertEquals(CHILDREN, future.get().getNumChildren());
// now we will close the idle connection.
waitForIdleConnectionClosed();
} }
} }