diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index d5fe7dff6f0..1723398dcdf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -94,6 +94,8 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Supplier; +import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; @@ -169,6 +171,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat @InterfaceAudience.Private public class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; + + public static final String MASTER_STATE_CACHE_TIMEOUT_SEC = + "hbase.client.master.state.cache.timeout.sec"; private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); // The mode tells if HedgedRead, LoadBalance mode is supported. @@ -251,6 +256,12 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { /** lock guards against multiple threads trying to query the meta region at the same time */ private final ReentrantLock userRegionLock = new ReentrantLock(); + /** + * Supplier to get masterState.By default uses simple supplier without TTL cache. When + * hbase.client.master.state.cache.timeout.sec > 0 it uses TTL Cache. + */ + private final Supplier masterStateSupplier; + private ChoreService choreService; /** @@ -385,6 +396,39 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { default: // Doing nothing } + + long masterStateCacheTimeout = conf.getLong(MASTER_STATE_CACHE_TIMEOUT_SEC, 0); + + Supplier masterConnSupplier = masterConnectionStateSupplier(); + if (masterStateCacheTimeout <= 0L) { + this.masterStateSupplier = masterConnSupplier; + } else { + this.masterStateSupplier = Suppliers.memoizeWithExpiration(masterConnSupplier, + masterStateCacheTimeout, TimeUnit.SECONDS); + } + } + + /** + * Visible for tests + */ + Supplier masterConnectionStateSupplier() { + return () -> { + if (this.masterServiceState.getStub() == null) { + return false; + } + try { + LOG.info("Getting master state using rpc call"); + return this.masterServiceState.isMasterRunning(); + } catch (UndeclaredThrowableException e) { + // It's somehow messy, but we can receive exceptions such as + // java.net.ConnectException but they're not declared. So we catch it... + LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); + return false; + } catch (IOException se) { + LOG.warn("Checking master connection", se); + return false; + } + }; } private void spawnRenewalChore(final UserGroupInformation user) { @@ -1258,7 +1302,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { * Class to make a MasterServiceStubMaker stub. */ private final class MasterServiceStubMaker { - private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) throws IOException { try { @@ -1358,6 +1401,13 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { final MasterServiceState masterServiceState = new MasterServiceState(this); + /** + * Visible for tests + */ + MasterServiceState getMasterServiceState() { + return this.masterServiceState; + } + @Override public MasterKeepAliveConnection getMaster() throws IOException { return getKeepAliveMasterService(); @@ -1368,13 +1418,16 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { } private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException { - synchronized (masterLock) { - if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) { - MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); - this.masterServiceState.stub = stubMaker.makeStub(); + if (!isKeepAliveMasterConnectedAndRunning()) { + synchronized (masterLock) { + if (!isKeepAliveMasterConnectedAndRunning()) { + MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); + this.masterServiceState.stub = stubMaker.makeStub(); + } + resetMasterServiceState(this.masterServiceState); } - resetMasterServiceState(this.masterServiceState); } + // Ugly delegation just so we can add in a Close method. final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub; return new MasterKeepAliveConnection() { @@ -1947,21 +2000,9 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { } } - private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) { - if (mss.getStub() == null) { - return false; - } - try { - return mss.isMasterRunning(); - } catch (UndeclaredThrowableException e) { - // It's somehow messy, but we can receive exceptions such as - // java.net.ConnectException but they're not declared. So we catch it... - LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); - return false; - } catch (IOException se) { - LOG.warn("Checking master connection", se); - return false; - } + private boolean isKeepAliveMasterConnectedAndRunning() { + LOG.info("Getting master connection state from TTL Cache"); + return masterStateSupplier.get(); } void releaseMaster(MasterServiceState mss) { diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java new file mode 100644 index 00000000000..6b721083bbc --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.UndeclaredThrowableException; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@Category({ ClientTests.class, MediumTests.class }) +@RunWith(MockitoJUnitRunner.class) +public class TestConnectionImplementation { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestConnectionImplementation.class); + private static final IntegrationTestingUtility TEST_UTIL = new IntegrationTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testGetMaster_noCachedMasterState() throws IOException, IllegalAccessException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0L); + ConnectionImplementation conn = + new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent()); + ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn); + conn.getMaster(); // This initializes the stubs but don't call isMasterRunning + conn.getMaster(); // Calls isMasterRunning since stubs are initialized. Invocation 1 + conn.getMaster(); // Calls isMasterRunning since stubs are initialized. Invocation 2 + Mockito.verify(masterServiceState, Mockito.times(2)).isMasterRunning(); + conn.close(); + } + + @Test + public void testGetMaster_masterStateCacheHit() throws IOException, IllegalAccessException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 15L); + ConnectionImplementation conn = + new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent()); + ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn); + conn.getMaster(); // This initializes the stubs but don't call isMasterRunning + conn.getMaster(); // Uses cached value, don't call isMasterRunning + conn.getMaster(); // Uses cached value, don't call isMasterRunning + Mockito.verify(masterServiceState, Mockito.times(0)).isMasterRunning(); + conn.close(); + } + + @Test + public void testGetMaster_masterStateCacheMiss() + throws IOException, InterruptedException, IllegalAccessException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 5L); + ConnectionImplementation conn = + new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent()); + ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn); + conn.getMaster(); // This initializes the stubs but don't call isMasterRunning + conn.getMaster(); // Uses cached value, don't call isMasterRunning + conn.getMaster(); // Uses cached value, don't call isMasterRunning + Thread.sleep(10000); + conn.getMaster(); // Calls isMasterRunning after cache expiry + Mockito.verify(masterServiceState, Mockito.times(1)).isMasterRunning(); + conn.close(); + } + + @Test + public void testIsKeepAliveMasterConnectedAndRunning_UndeclaredThrowableException() + throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0); + ConnectionImplementation conn = + new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent()); + conn.getMaster(); // Initializes stubs + + ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn); + Mockito.doThrow(new UndeclaredThrowableException(new Exception("DUMMY EXCEPTION"))) + .when(masterServiceState).isMasterRunning(); + + // Verify that masterState is "false" because of to injected exception + boolean isKeepAliveMasterRunning = + (boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn); + Assert.assertFalse(isKeepAliveMasterRunning); + conn.close(); + } + + @Test + public void testIsKeepAliveMasterConnectedAndRunning_IOException() + throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0); + ConnectionImplementation conn = + new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent()); + conn.getMaster(); + + ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn); + Mockito.doThrow(new IOException("DUMMY EXCEPTION")).when(masterServiceState).isMasterRunning(); + + boolean isKeepAliveMasterRunning = + (boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn); + + // Verify that masterState is "false" because of to injected exception + Assert.assertFalse(isKeepAliveMasterRunning); + conn.close(); + } + + // Spy the masterServiceState object using reflection + private ConnectionImplementation.MasterServiceState + spyMasterServiceState(ConnectionImplementation conn) throws IllegalAccessException { + ConnectionImplementation.MasterServiceState spiedMasterServiceState = + Mockito.spy(conn.getMasterServiceState()); + FieldUtils.writeDeclaredField(conn, "masterServiceState", spiedMasterServiceState, true); + return spiedMasterServiceState; + } + + // Get isKeepAliveMasterConnectedAndRunning using reflection + private Method getIsKeepAliveMasterConnectedAndRunningMethod() throws NoSuchMethodException { + Method method = + ConnectionImplementation.class.getDeclaredMethod("isKeepAliveMasterConnectedAndRunning"); + method.setAccessible(true); + return method; + } +}