diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java index 5fdbc551960..60137d23fff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java @@ -90,8 +90,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry private final RegistryEndpointsRefresher registryEndpointRefresher; protected AbstractRpcBasedConnectionRegistry(Configuration conf, - String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName, - String minRefreshIntervalSecsConfigName) throws IOException { + String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName, + String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName) + throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT)); rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, @@ -103,8 +104,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry rpcControllerFactory = RpcControllerFactory.instantiate(conf); populateStubs(getBootstrapNodes(conf)); // could return null here is refresh interval is less than zero - registryEndpointRefresher = RegistryEndpointsRefresher.create(conf, - refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs); + registryEndpointRefresher = + RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName, + refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs); } protected abstract Set getBootstrapNodes(Configuration conf) throws IOException; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java index af6eaa5e387..64e389cf35e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java @@ -59,6 +59,9 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry { public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY = "hbase.client.master_registry.hedged.fanout"; + public static final String MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS = + "hbase.client.master_registry.initial_refresh_delay_secs"; + public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS = "hbase.client.master_registry.refresh_interval_secs"; @@ -85,7 +88,7 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry { } MasterRegistry(Configuration conf) throws IOException { - super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, + super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS, MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java index 9b450f96e9c..2064021f714 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java @@ -45,6 +45,7 @@ final class RegistryEndpointsRefresher { private final Thread thread; private final Refresher refresher; + private final long initialDelayMs; private final long periodicRefreshMs; private final long minTimeBetweenRefreshesMs; @@ -56,9 +57,20 @@ final class RegistryEndpointsRefresher { notifyAll(); } + private long getRefreshIntervalMs(boolean firstRefresh) { + if (refreshNow) { + return minTimeBetweenRefreshesMs; + } + if (firstRefresh) { + return initialDelayMs; + } + return periodicRefreshMs; + } + // The main loop for the refresh thread. private void mainLoop() { long lastRefreshTime = EnvironmentEdgeManager.currentTime(); + boolean firstRefresh = true; for (;;) { synchronized (this) { for (;;) { @@ -68,9 +80,12 @@ final class RegistryEndpointsRefresher { } // if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed, // otherwise wait until periodicRefreshMs elapsed - long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) - + long waitTime = getRefreshIntervalMs(firstRefresh) - (EnvironmentEdgeManager.currentTime() - lastRefreshTime); if (waitTime <= 0) { + // we are going to refresh, reset this flag + firstRefresh = false; + refreshNow = false; break; } try { @@ -81,8 +96,6 @@ final class RegistryEndpointsRefresher { continue; } } - // we are going to refresh, reset this flag - refreshNow = false; } LOG.debug("Attempting to refresh registry end points"); try { @@ -104,8 +117,9 @@ final class RegistryEndpointsRefresher { void refresh() throws IOException; } - private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs, - Refresher refresher) { + private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs, + long minTimeBetweenRefreshesMs, Refresher refresher) { + this.initialDelayMs = initialDelayMs; this.periodicRefreshMs = periodicRefreshMs; this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs; this.refresher = refresher; @@ -129,16 +143,19 @@ final class RegistryEndpointsRefresher { * {@code intervalSecsConfigName} is less than zero, will return null here, which means disable * refreshing of endpoints. */ - static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName, - String minIntervalSecsConfigName, Refresher refresher) { + static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName, + String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) { long periodicRefreshMs = TimeUnit.SECONDS .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); if (periodicRefreshMs <= 0) { return null; } + long initialDelayMs = Math.max(1, + TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10))); long minTimeBetweenRefreshesMs = TimeUnit.SECONDS .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT)); Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs); - return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher); + return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs, + minTimeBetweenRefreshesMs, refresher); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java index bcd37b182b6..731d6202b3e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java @@ -51,6 +51,17 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry { /** Configuration key that controls the fan out of requests **/ public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout"; + /** + * As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is + * possible that different end users will configure the same machine which makes the machine over + * load. So we should have a shorter delay for the initial refresh, to let users quickly switch to + * the bootstrap nodes we want them to connect to. + *

+ * The default value for initial refresh delay is 1/10 of periodic refresh interval. + */ + public static final String INITIAL_REFRESH_DELAY_SECS = + "hbase.client.bootstrap.initial_refresh_delay_secs"; + public static final String PERIODIC_REFRESH_INTERVAL_SECS = "hbase.client.bootstrap.refresh_interval_secs"; @@ -62,7 +73,8 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry { private static final char ADDRS_CONF_SEPARATOR = ','; RpcConnectionRegistry(Configuration conf) throws IOException { - super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES); + super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS, + MIN_SECS_BETWEEN_REFRESHES); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java index f8893c99499..3d6fe1563b8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +46,8 @@ public class TestRegistryEndpointsRefresher { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class); + private static final String INITIAL_DELAY_SECS_CONFIG_NAME = + "hbase.test.registry.initial.delay.secs"; private static final String INTERVAL_SECS_CONFIG_NAME = "hbase.test.registry.refresh.interval.secs"; private static final String MIN_INTERVAL_SECS_CONFIG_NAME = @@ -75,33 +77,45 @@ public class TestRegistryEndpointsRefresher { callTimestamps.add(EnvironmentEdgeManager.currentTime()); } - private void createRefresher(long intervalSecs, long minIntervalSecs) { + private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) { + conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs); conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs); conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs); - refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME, - MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh); + refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME, + INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh); } @Test public void testDisableRefresh() { conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1); assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME, - MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh)); + INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh)); } @Test - public void testPeriodicEndpointRefresh() throws IOException { + public void testInitialDelay() throws InterruptedException { + createRefresher(1, 10, 0); + // Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1 + // seconds + Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1); + // Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds + Thread.sleep(5000); + assertEquals(1, refreshCallCounter.get()); + } + + @Test + public void testPeriodicMasterEndPointRefresh() { // Refresh every 1 second. - createRefresher(1, 0); + createRefresher(1, 1, 0); // Wait for > 3 seconds to see that at least 3 refresh have been made. Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3); } @Test - public void testDurationBetweenRefreshes() throws IOException { + public void testDurationBetweenRefreshes() { // Disable periodic refresh // A minimum duration of 1s between refreshes - createRefresher(Integer.MAX_VALUE, 1); + createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1); // Issue a ton of manual refreshes. for (int i = 0; i < 10000; i++) { refresher.refreshNow(); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java index 66eee821c71..2979743f479 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java @@ -69,6 +69,8 @@ public class TestRpcBasedRegistryHedgedReads { private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class); private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout"; + private static final String INITIAL_DELAY_SECS_CONFIG_NAME = + "hbase.test.refresh.initial.delay.secs"; private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME = "hbase.test.refresh.interval.secs"; private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME = @@ -153,7 +155,8 @@ public class TestRpcBasedRegistryHedgedReads { Configuration conf = UTIL.getConfiguration(); conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged); return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME, - REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) { + INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME, + MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) { @Override protected Set getBootstrapNodes(Configuration conf) throws IOException { @@ -173,6 +176,7 @@ public class TestRpcBasedRegistryHedgedReads { conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class, RpcClient.class); // disable refresh, we do not need to refresh in this test + conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE); conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE); conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1); BOOTSTRAP_NODES = IntStream.range(0, 10) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java index 69c08b627a2..4cbabb1fabb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java @@ -58,6 +58,7 @@ public class TestRpcConnectionRegistry { @BeforeClass public static void setUpBeforeClass() throws Exception { // allow refresh immediately so we will switch to use region servers soon. + UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1); UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1); UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0); UTIL.startMiniCluster(3); @@ -91,8 +92,8 @@ public class TestRpcConnectionRegistry { @Test public void testRegistryRPCs() throws Exception { HMaster activeMaster = UTIL.getHBaseCluster().getMaster(); - // wait until we switch to use region servers - UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3); + // sleep 3 seconds, since our initial delay is 1 second, we should have refreshed the endpoints + Thread.sleep(3000); assertThat(registry.getParsedServers(), hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));