HBASE-27527 Port HBASE-27498 to branch-2 (#4919)
Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
This commit is contained in:
parent
98d66828bd
commit
2e94f6fb50
|
@ -96,6 +96,7 @@ import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
|
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
|
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
|
||||||
|
@ -179,6 +180,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ConnectionImplementation implements ClusterConnection, Closeable {
|
public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
|
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
|
||||||
|
|
||||||
|
public static final String MASTER_STATE_CACHE_TIMEOUT_SEC =
|
||||||
|
"hbase.client.master.state.cache.timeout.sec";
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
|
||||||
|
|
||||||
// The mode tells if HedgedRead, LoadBalance mode is supported.
|
// The mode tells if HedgedRead, LoadBalance mode is supported.
|
||||||
|
@ -261,6 +265,12 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
/** lock guards against multiple threads trying to query the meta region at the same time */
|
/** lock guards against multiple threads trying to query the meta region at the same time */
|
||||||
private final ReentrantLock userRegionLock = new ReentrantLock();
|
private final ReentrantLock userRegionLock = new ReentrantLock();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Supplier to get masterState.By default uses simple supplier without TTL cache. When
|
||||||
|
* hbase.client.master.state.cache.timeout.sec > 0 it uses TTL Cache.
|
||||||
|
*/
|
||||||
|
private final Supplier<Boolean> masterStateSupplier;
|
||||||
|
|
||||||
private ChoreService choreService;
|
private ChoreService choreService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -395,6 +405,39 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
default:
|
default:
|
||||||
// Doing nothing
|
// Doing nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long masterStateCacheTimeout = conf.getLong(MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
|
||||||
|
|
||||||
|
Supplier<Boolean> masterConnSupplier = masterConnectionStateSupplier();
|
||||||
|
if (masterStateCacheTimeout <= 0L) {
|
||||||
|
this.masterStateSupplier = masterConnSupplier;
|
||||||
|
} else {
|
||||||
|
this.masterStateSupplier = Suppliers.memoizeWithExpiration(masterConnSupplier::get,
|
||||||
|
masterStateCacheTimeout, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Visible for tests
|
||||||
|
*/
|
||||||
|
Supplier<Boolean> masterConnectionStateSupplier() {
|
||||||
|
return () -> {
|
||||||
|
if (this.masterServiceState.getStub() == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
LOG.info("Getting master state using rpc call");
|
||||||
|
return this.masterServiceState.isMasterRunning();
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
// It's somehow messy, but we can receive exceptions such as
|
||||||
|
// java.net.ConnectException but they're not declared. So we catch it...
|
||||||
|
LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
|
||||||
|
return false;
|
||||||
|
} catch (IOException se) {
|
||||||
|
LOG.warn("Checking master connection", se);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private void spawnRenewalChore(final UserGroupInformation user) {
|
private void spawnRenewalChore(final UserGroupInformation user) {
|
||||||
|
@ -1268,7 +1311,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
* Class to make a MasterServiceStubMaker stub.
|
* Class to make a MasterServiceStubMaker stub.
|
||||||
*/
|
*/
|
||||||
private final class MasterServiceStubMaker {
|
private final class MasterServiceStubMaker {
|
||||||
|
|
||||||
private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
|
private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
|
@ -1368,6 +1410,13 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
|
|
||||||
final MasterServiceState masterServiceState = new MasterServiceState(this);
|
final MasterServiceState masterServiceState = new MasterServiceState(this);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Visible for tests
|
||||||
|
*/
|
||||||
|
MasterServiceState getMasterServiceState() {
|
||||||
|
return this.masterServiceState;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MasterKeepAliveConnection getMaster() throws IOException {
|
public MasterKeepAliveConnection getMaster() throws IOException {
|
||||||
return getKeepAliveMasterService();
|
return getKeepAliveMasterService();
|
||||||
|
@ -1378,13 +1427,16 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
|
private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
|
||||||
synchronized (masterLock) {
|
if (!isKeepAliveMasterConnectedAndRunning()) {
|
||||||
if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
|
synchronized (masterLock) {
|
||||||
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
|
if (!isKeepAliveMasterConnectedAndRunning()) {
|
||||||
this.masterServiceState.stub = stubMaker.makeStub();
|
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
|
||||||
|
this.masterServiceState.stub = stubMaker.makeStub();
|
||||||
|
}
|
||||||
|
resetMasterServiceState(this.masterServiceState);
|
||||||
}
|
}
|
||||||
resetMasterServiceState(this.masterServiceState);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ugly delegation just so we can add in a Close method.
|
// Ugly delegation just so we can add in a Close method.
|
||||||
final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
|
final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
|
||||||
return new MasterKeepAliveConnection() {
|
return new MasterKeepAliveConnection() {
|
||||||
|
@ -1977,21 +2029,9 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
|
private boolean isKeepAliveMasterConnectedAndRunning() {
|
||||||
if (mss.getStub() == null) {
|
LOG.info("Getting master connection state from TTL Cache");
|
||||||
return false;
|
return masterStateSupplier.get();
|
||||||
}
|
|
||||||
try {
|
|
||||||
return mss.isMasterRunning();
|
|
||||||
} catch (UndeclaredThrowableException e) {
|
|
||||||
// It's somehow messy, but we can receive exceptions such as
|
|
||||||
// java.net.ConnectException but they're not declared. So we catch it...
|
|
||||||
LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
|
|
||||||
return false;
|
|
||||||
} catch (IOException se) {
|
|
||||||
LOG.warn("Checking master connection", se);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void releaseMaster(MasterServiceState mss) {
|
void releaseMaster(MasterServiceState mss) {
|
||||||
|
|
|
@ -0,0 +1,160 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
|
import org.apache.commons.lang3.reflect.FieldUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.junit.MockitoJUnitRunner;
|
||||||
|
|
||||||
|
@Category({ ClientTests.class, MediumTests.class })
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
public class TestConnectionImplementation {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestConnectionImplementation.class);
|
||||||
|
private static final IntegrationTestingUtility TEST_UTIL = new IntegrationTestingUtility();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetMaster_noCachedMasterState() throws IOException, IllegalAccessException {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0L);
|
||||||
|
ConnectionImplementation conn =
|
||||||
|
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
|
||||||
|
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
|
||||||
|
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
|
||||||
|
conn.getMaster(); // Calls isMasterRunning since stubs are initialized. Invocation 1
|
||||||
|
conn.getMaster(); // Calls isMasterRunning since stubs are initialized. Invocation 2
|
||||||
|
Mockito.verify(masterServiceState, Mockito.times(2)).isMasterRunning();
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetMaster_masterStateCacheHit() throws IOException, IllegalAccessException {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 15L);
|
||||||
|
ConnectionImplementation conn =
|
||||||
|
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
|
||||||
|
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
|
||||||
|
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
|
||||||
|
conn.getMaster(); // Uses cached value, don't call isMasterRunning
|
||||||
|
conn.getMaster(); // Uses cached value, don't call isMasterRunning
|
||||||
|
Mockito.verify(masterServiceState, Mockito.times(0)).isMasterRunning();
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetMaster_masterStateCacheMiss()
|
||||||
|
throws IOException, InterruptedException, IllegalAccessException {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 5L);
|
||||||
|
ConnectionImplementation conn =
|
||||||
|
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
|
||||||
|
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
|
||||||
|
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
|
||||||
|
conn.getMaster(); // Uses cached value, don't call isMasterRunning
|
||||||
|
conn.getMaster(); // Uses cached value, don't call isMasterRunning
|
||||||
|
Thread.sleep(10000);
|
||||||
|
conn.getMaster(); // Calls isMasterRunning after cache expiry. Invocation 1
|
||||||
|
Mockito.verify(masterServiceState, Mockito.times(1)).isMasterRunning();
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsKeepAliveMasterConnectedAndRunning_UndeclaredThrowableException()
|
||||||
|
throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
|
||||||
|
ConnectionImplementation conn =
|
||||||
|
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
|
||||||
|
conn.getMaster(); // Initializes stubs
|
||||||
|
|
||||||
|
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
|
||||||
|
Mockito.doThrow(new UndeclaredThrowableException(new Exception("DUMMY EXCEPTION")))
|
||||||
|
.when(masterServiceState).isMasterRunning();
|
||||||
|
|
||||||
|
// Verify that masterState is "false" because of to injected exception
|
||||||
|
boolean isKeepAliveMasterRunning =
|
||||||
|
(boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn);
|
||||||
|
Assert.assertFalse(isKeepAliveMasterRunning);
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsKeepAliveMasterConnectedAndRunning_IOException()
|
||||||
|
throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
|
||||||
|
ConnectionImplementation conn =
|
||||||
|
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
|
||||||
|
conn.getMaster();
|
||||||
|
|
||||||
|
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
|
||||||
|
Mockito.doThrow(new IOException("DUMMY EXCEPTION")).when(masterServiceState).isMasterRunning();
|
||||||
|
|
||||||
|
boolean isKeepAliveMasterRunning =
|
||||||
|
(boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn);
|
||||||
|
|
||||||
|
// Verify that masterState is "false" because of to injected exception
|
||||||
|
Assert.assertFalse(isKeepAliveMasterRunning);
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spy the masterServiceState object using reflection
|
||||||
|
private ConnectionImplementation.MasterServiceState
|
||||||
|
spyMasterServiceState(ConnectionImplementation conn) throws IllegalAccessException {
|
||||||
|
ConnectionImplementation.MasterServiceState spiedMasterServiceState =
|
||||||
|
Mockito.spy(conn.getMasterServiceState());
|
||||||
|
FieldUtils.writeDeclaredField(conn, "masterServiceState", spiedMasterServiceState, true);
|
||||||
|
return spiedMasterServiceState;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get isKeepAliveMasterConnectedAndRunning using reflection
|
||||||
|
private Method getIsKeepAliveMasterConnectedAndRunningMethod() throws NoSuchMethodException {
|
||||||
|
Method method =
|
||||||
|
ConnectionImplementation.class.getDeclaredMethod("isKeepAliveMasterConnectedAndRunning");
|
||||||
|
method.setAccessible(true);
|
||||||
|
return method;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue