From 0dea3656abd7e89a80ec52416ba5a313ab783f13 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 29 Jan 2018 16:06:05 +0800 Subject: [PATCH] HBASE-19870 Fix the NPE in ReadOnlyZKClient#run Signed-off-by: Chia-Ping Tsai --- .../hbase/zookeeper/ReadOnlyZKClient.java | 14 ++++--- .../hbase/zookeeper/TestReadOnlyZKClient.java | 42 ++++++++++--------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index ad707408618..275fafbebad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -311,12 +311,14 @@ public final class ReadOnlyZKClient implements Closeable { if (task == CLOSE) { break; } - if (task == null && pendingRequests == 0) { - LOG.debug( - "{} to {} no activities for {} ms, close active connection. " + - "Will reconnect next time when there are new requests", - getId(), connectString, keepAliveTimeMs); - closeZk(); + if (task == null) { + if (pendingRequests == 0) { + LOG.debug( + "{} to {} no activities for {} ms, close active connection. " + + "Will reconnect next time when there are new requests", + getId(), connectString, keepAliveTimeMs); + closeZk(); + } continue; } if (!task.needZk()) { diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 211e7769d79..47f0e0b91f8 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -31,11 +31,15 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; @@ -44,19 +48,16 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ZKTests; -import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; @Category({ ZKTests.class, MediumTests.class }) public class TestReadOnlyZKClient { @@ -159,25 +160,26 @@ public class TestReadOnlyZKClient { @Test public void testNotCloseZkWhenPending() throws Exception { - assertArrayEquals(DATA, RO_ZK.get(PATH).get()); - ZooKeeper mockedZK = spy(RO_ZK.zookeeper); - CountDownLatch latch = new CountDownLatch(1); - doAnswer(new Answer() { - - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - latch.await(); - return invocation.callRealMethod(); - } - }).when(mockedZK).exists(anyString(), anyBoolean(), any(StatCallback.class), any()); + ZooKeeper mockedZK = mock(ZooKeeper.class); + Exchanger exchanger = new Exchanger<>(); + doAnswer(i -> { + exchanger.exchange(i.getArgument(2)); + return null; + }).when(mockedZK).getData(anyString(), anyBoolean(), + any(AsyncCallback.DataCallback.class), any()); + doAnswer(i -> null).when(mockedZK).close(); + when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED); RO_ZK.zookeeper = mockedZK; - CompletableFuture future = RO_ZK.exists(PATH); + CompletableFuture future = RO_ZK.get(PATH); + AsyncCallback.DataCallback callback = exchanger.exchange(null); // 2 * keep alive time to ensure that we will not close the zk when there are pending requests Thread.sleep(6000); assertNotNull(RO_ZK.zookeeper); - latch.countDown(); - assertEquals(CHILDREN, future.get().getNumChildren()); + verify(mockedZK, never()).close(); + callback.processResult(Code.OK.intValue(), PATH, null, DATA, null); + assertArrayEquals(DATA, future.get()); // now we will close the idle connection. waitForIdleConnectionClosed(); + verify(mockedZK, times(1)).close(); } }