HBASE-19870 Fix the NPE in ReadOnlyZKClient#run
Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
f9480a56c7
commit
221eb95768
|
@ -311,12 +311,14 @@ public final class ReadOnlyZKClient implements Closeable {
|
|||
if (task == CLOSE) {
|
||||
break;
|
||||
}
|
||||
if (task == null && pendingRequests == 0) {
|
||||
LOG.debug(
|
||||
"{} to {} no activities for {} ms, close active connection. " +
|
||||
"Will reconnect next time when there are new requests",
|
||||
getId(), connectString, keepAliveTimeMs);
|
||||
closeZk();
|
||||
if (task == null) {
|
||||
if (pendingRequests == 0) {
|
||||
LOG.debug(
|
||||
"{} to {} no activities for {} ms, close active connection. " +
|
||||
"Will reconnect next time when there are new requests",
|
||||
getId(), connectString, keepAliveTimeMs);
|
||||
closeZk();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (!task.needZk()) {
|
||||
|
|
|
@ -31,11 +31,15 @@ import static org.mockito.ArgumentMatchers.any;
|
|||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Exchanger;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -45,20 +49,17 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ZKTests;
|
||||
import org.apache.zookeeper.AsyncCallback.StatCallback;
|
||||
import org.apache.zookeeper.AsyncCallback;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.Code;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
@Category({ ZKTests.class, MediumTests.class })
|
||||
public class TestReadOnlyZKClient {
|
||||
|
@ -165,25 +166,26 @@ public class TestReadOnlyZKClient {
|
|||
|
||||
@Test
|
||||
public void testNotCloseZkWhenPending() throws Exception {
|
||||
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
|
||||
ZooKeeper mockedZK = spy(RO_ZK.zookeeper);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
doAnswer(new Answer<Object>() {
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
latch.await();
|
||||
return invocation.callRealMethod();
|
||||
}
|
||||
}).when(mockedZK).exists(anyString(), anyBoolean(), any(StatCallback.class), any());
|
||||
ZooKeeper mockedZK = mock(ZooKeeper.class);
|
||||
Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>();
|
||||
doAnswer(i -> {
|
||||
exchanger.exchange(i.getArgument(2));
|
||||
return null;
|
||||
}).when(mockedZK).getData(anyString(), anyBoolean(),
|
||||
any(AsyncCallback.DataCallback.class), any());
|
||||
doAnswer(i -> null).when(mockedZK).close();
|
||||
when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED);
|
||||
RO_ZK.zookeeper = mockedZK;
|
||||
CompletableFuture<Stat> future = RO_ZK.exists(PATH);
|
||||
CompletableFuture<byte[]> future = RO_ZK.get(PATH);
|
||||
AsyncCallback.DataCallback callback = exchanger.exchange(null);
|
||||
// 2 * keep alive time to ensure that we will not close the zk when there are pending requests
|
||||
Thread.sleep(6000);
|
||||
assertNotNull(RO_ZK.zookeeper);
|
||||
latch.countDown();
|
||||
assertEquals(CHILDREN, future.get().getNumChildren());
|
||||
verify(mockedZK, never()).close();
|
||||
callback.processResult(Code.OK.intValue(), PATH, null, DATA, null);
|
||||
assertArrayEquals(DATA, future.get());
|
||||
// now we will close the idle connection.
|
||||
waitForIdleConnectionClosed();
|
||||
verify(mockedZK, times(1)).close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue