HBASE-19870 Fix the NPE in ReadOnlyZKClient#run

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
zhangduo 2018-01-29 16:06:05 +08:00 committed by Chia-Ping Tsai
parent cdda0a79ea
commit 0dea3656ab
2 changed files with 30 additions and 26 deletions

View File

@ -311,12 +311,14 @@ public final class ReadOnlyZKClient implements Closeable {
if (task == CLOSE) { if (task == CLOSE) {
break; break;
} }
if (task == null && pendingRequests == 0) { if (task == null) {
LOG.debug( if (pendingRequests == 0) {
"{} to {} no activities for {} ms, close active connection. " + LOG.debug(
"Will reconnect next time when there are new requests", "{} to {} no activities for {} ms, close active connection. " +
getId(), connectString, keepAliveTimeMs); "Will reconnect next time when there are new requests",
closeZk(); getId(), connectString, keepAliveTimeMs);
closeZk();
}
continue; continue;
} }
if (!task.needZk()) { if (!task.needZk()) {

View File

@ -31,11 +31,15 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -44,19 +48,16 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category({ ZKTests.class, MediumTests.class }) @Category({ ZKTests.class, MediumTests.class })
public class TestReadOnlyZKClient { public class TestReadOnlyZKClient {
@ -159,25 +160,26 @@ public class TestReadOnlyZKClient {
@Test @Test
public void testNotCloseZkWhenPending() throws Exception { public void testNotCloseZkWhenPending() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH).get()); ZooKeeper mockedZK = mock(ZooKeeper.class);
ZooKeeper mockedZK = spy(RO_ZK.zookeeper); Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>();
CountDownLatch latch = new CountDownLatch(1); doAnswer(i -> {
doAnswer(new Answer<Object>() { exchanger.exchange(i.getArgument(2));
return null;
@Override }).when(mockedZK).getData(anyString(), anyBoolean(),
public Object answer(InvocationOnMock invocation) throws Throwable { any(AsyncCallback.DataCallback.class), any());
latch.await(); doAnswer(i -> null).when(mockedZK).close();
return invocation.callRealMethod(); when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED);
}
}).when(mockedZK).exists(anyString(), anyBoolean(), any(StatCallback.class), any());
RO_ZK.zookeeper = mockedZK; RO_ZK.zookeeper = mockedZK;
CompletableFuture<Stat> future = RO_ZK.exists(PATH); CompletableFuture<byte[]> future = RO_ZK.get(PATH);
AsyncCallback.DataCallback callback = exchanger.exchange(null);
// 2 * keep alive time to ensure that we will not close the zk when there are pending requests // 2 * keep alive time to ensure that we will not close the zk when there are pending requests
Thread.sleep(6000); Thread.sleep(6000);
assertNotNull(RO_ZK.zookeeper); assertNotNull(RO_ZK.zookeeper);
latch.countDown(); verify(mockedZK, never()).close();
assertEquals(CHILDREN, future.get().getNumChildren()); callback.processResult(Code.OK.intValue(), PATH, null, DATA, null);
assertArrayEquals(DATA, future.get());
// now we will close the idle connection. // now we will close the idle connection.
waitForIdleConnectionClosed(); waitForIdleConnectionClosed();
verify(mockedZK, times(1)).close();
} }
} }