From d8fb978befb2e64fea041f6131929a82f4b07562 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 24 Aug 2021 14:04:05 +0800 Subject: [PATCH] HBASE-26182 Allow disabling refresh of connection registry endpoint (#3605) Signed-off-by: Bharath Vissapragada --- .../AbstractRpcBasedConnectionRegistry.java | 6 +-- .../client/RegistryEndpointsRefresher.java | 41 ++++++++++++------- .../TestRegistryEndpointsRefresher.java | 19 ++++++--- 3 files changed, 42 insertions(+), 24 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java index 6a2919e8fbf..002137c69ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java @@ -100,9 +100,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); populateStubs(getBootstrapNodes(conf)); - registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName, - minRefreshIntervalSecsConfigName, this::refreshStubs); - registryEndpointRefresher.start(); + // could return null here is refresh interval is less than zero + registryEndpointRefresher = RegistryEndpointsRefresher.create(conf, + refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs); } protected abstract Set getBootstrapNodes(Configuration conf) throws IOException; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java index 6599b620eb2..9b450f96e9c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java @@ -35,7 +35,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; * {@code minIntervalSecsConfigName} seconds apart. */ @InterfaceAudience.Private -class RegistryEndpointsRefresher { +final class RegistryEndpointsRefresher { private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class); @@ -51,11 +51,7 @@ class RegistryEndpointsRefresher { private boolean refreshNow = false; private boolean stopped = false; - public void start() { - thread.start(); - } - - public synchronized void stop() { + synchronized void stop() { stopped = true; notifyAll(); } @@ -108,18 +104,15 @@ class RegistryEndpointsRefresher { void refresh() throws IOException; } - RegistryEndpointsRefresher(Configuration conf, String intervalSecsConfigName, - String minIntervalSecsConfigName, Refresher refresher) { - periodicRefreshMs = TimeUnit.SECONDS - .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); - minTimeBetweenRefreshesMs = TimeUnit.SECONDS - .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT)); - Preconditions.checkArgument(periodicRefreshMs > 0); - Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs); + private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs, + Refresher refresher) { + this.periodicRefreshMs = periodicRefreshMs; + this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs; + this.refresher = refresher; thread = new Thread(this::mainLoop); thread.setName("Registry-endpoints-refresh-end-points"); thread.setDaemon(true); - this.refresher = refresher; + thread.start(); } /** @@ -130,4 +123,22 @@ class RegistryEndpointsRefresher { refreshNow = true; notifyAll(); } + + /** + * Create a {@link RegistryEndpointsRefresher}. If the interval secs configured via + * {@code intervalSecsConfigName} is less than zero, will return null here, which means disable + * refreshing of endpoints. + */ + static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName, + String minIntervalSecsConfigName, Refresher refresher) { + long periodicRefreshMs = TimeUnit.SECONDS + .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); + if (periodicRefreshMs <= 0) { + return null; + } + long minTimeBetweenRefreshesMs = TimeUnit.SECONDS + .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT)); + Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs); + return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher); + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java index 748ad41952a..f8893c99499 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -74,18 +75,24 @@ public class TestRegistryEndpointsRefresher { callTimestamps.add(EnvironmentEdgeManager.currentTime()); } - private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) { + private void createRefresher(long intervalSecs, long minIntervalSecs) { conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs); conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs); - refresher = new RegistryEndpointsRefresher(conf, INTERVAL_SECS_CONFIG_NAME, + refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh); - refresher.start(); } @Test - public void testPeriodicMasterEndPointRefresh() throws IOException { + public void testDisableRefresh() { + conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1); + assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME, + MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh)); + } + + @Test + public void testPeriodicEndpointRefresh() throws IOException { // Refresh every 1 second. - createAndStartRefresher(1, 0); + createRefresher(1, 0); // Wait for > 3 seconds to see that at least 3 refresh have been made. Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3); } @@ -94,7 +101,7 @@ public class TestRegistryEndpointsRefresher { public void testDurationBetweenRefreshes() throws IOException { // Disable periodic refresh // A minimum duration of 1s between refreshes - createAndStartRefresher(Integer.MAX_VALUE, 1); + createRefresher(Integer.MAX_VALUE, 1); // Issue a ton of manual refreshes. for (int i = 0; i < 10000; i++) { refresher.refreshNow();