HBASE-26180 Introduce a initial refresh interval for RpcConnectionRegistry (#3601)

Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
Duo Zhang 2021-08-25 18:42:04 +08:00 committed by GitHub
parent 1fb78a3302
commit 91db10a8bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 79 additions and 26 deletions

View File

@ -90,8 +90,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
private final RegistryEndpointsRefresher registryEndpointRefresher; private final RegistryEndpointsRefresher registryEndpointRefresher;
protected AbstractRpcBasedConnectionRegistry(Configuration conf, protected AbstractRpcBasedConnectionRegistry(Configuration conf,
String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName, String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
String minRefreshIntervalSecsConfigName) throws IOException { String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
throws IOException {
this.hedgedReadFanOut = this.hedgedReadFanOut =
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT)); Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
@ -103,8 +104,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
rpcControllerFactory = RpcControllerFactory.instantiate(conf); rpcControllerFactory = RpcControllerFactory.instantiate(conf);
populateStubs(getBootstrapNodes(conf)); populateStubs(getBootstrapNodes(conf));
// could return null here is refresh interval is less than zero // could return null here is refresh interval is less than zero
registryEndpointRefresher = RegistryEndpointsRefresher.create(conf, registryEndpointRefresher =
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs); RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
} }
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException; protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;

View File

@ -59,6 +59,9 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY = public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
"hbase.client.master_registry.hedged.fanout"; "hbase.client.master_registry.hedged.fanout";
public static final String MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS =
"hbase.client.master_registry.initial_refresh_delay_secs";
public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS = public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.master_registry.refresh_interval_secs"; "hbase.client.master_registry.refresh_interval_secs";
@ -85,7 +88,7 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
} }
MasterRegistry(Configuration conf) throws IOException { MasterRegistry(Configuration conf) throws IOException {
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES); MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
} }

View File

@ -45,6 +45,7 @@ final class RegistryEndpointsRefresher {
private final Thread thread; private final Thread thread;
private final Refresher refresher; private final Refresher refresher;
private final long initialDelayMs;
private final long periodicRefreshMs; private final long periodicRefreshMs;
private final long minTimeBetweenRefreshesMs; private final long minTimeBetweenRefreshesMs;
@ -56,9 +57,20 @@ final class RegistryEndpointsRefresher {
notifyAll(); notifyAll();
} }
private long getRefreshIntervalMs(boolean firstRefresh) {
if (refreshNow) {
return minTimeBetweenRefreshesMs;
}
if (firstRefresh) {
return initialDelayMs;
}
return periodicRefreshMs;
}
// The main loop for the refresh thread. // The main loop for the refresh thread.
private void mainLoop() { private void mainLoop() {
long lastRefreshTime = EnvironmentEdgeManager.currentTime(); long lastRefreshTime = EnvironmentEdgeManager.currentTime();
boolean firstRefresh = true;
for (;;) { for (;;) {
synchronized (this) { synchronized (this) {
for (;;) { for (;;) {
@ -68,9 +80,12 @@ final class RegistryEndpointsRefresher {
} }
// if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed, // if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
// otherwise wait until periodicRefreshMs elapsed // otherwise wait until periodicRefreshMs elapsed
long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) - long waitTime = getRefreshIntervalMs(firstRefresh) -
(EnvironmentEdgeManager.currentTime() - lastRefreshTime); (EnvironmentEdgeManager.currentTime() - lastRefreshTime);
if (waitTime <= 0) { if (waitTime <= 0) {
// we are going to refresh, reset this flag
firstRefresh = false;
refreshNow = false;
break; break;
} }
try { try {
@ -81,8 +96,6 @@ final class RegistryEndpointsRefresher {
continue; continue;
} }
} }
// we are going to refresh, reset this flag
refreshNow = false;
} }
LOG.debug("Attempting to refresh registry end points"); LOG.debug("Attempting to refresh registry end points");
try { try {
@ -104,8 +117,9 @@ final class RegistryEndpointsRefresher {
void refresh() throws IOException; void refresh() throws IOException;
} }
private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs, private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs,
Refresher refresher) { long minTimeBetweenRefreshesMs, Refresher refresher) {
this.initialDelayMs = initialDelayMs;
this.periodicRefreshMs = periodicRefreshMs; this.periodicRefreshMs = periodicRefreshMs;
this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs; this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
this.refresher = refresher; this.refresher = refresher;
@ -129,16 +143,19 @@ final class RegistryEndpointsRefresher {
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable * {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
* refreshing of endpoints. * refreshing of endpoints.
*/ */
static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName, static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName,
String minIntervalSecsConfigName, Refresher refresher) { String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) {
long periodicRefreshMs = TimeUnit.SECONDS long periodicRefreshMs = TimeUnit.SECONDS
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
if (periodicRefreshMs <= 0) { if (periodicRefreshMs <= 0) {
return null; return null;
} }
long initialDelayMs = Math.max(1,
TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10)));
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT)); .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs); Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher); return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs,
minTimeBetweenRefreshesMs, refresher);
} }
} }

View File

@ -51,6 +51,17 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
/** Configuration key that controls the fan out of requests **/ /** Configuration key that controls the fan out of requests **/
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout"; public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";
/**
* As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is
* possible that different end users will configure the same machine which makes the machine over
* load. So we should have a shorter delay for the initial refresh, to let users quickly switch to
* the bootstrap nodes we want them to connect to.
* <p/>
* The default value for initial refresh delay is 1/10 of periodic refresh interval.
*/
public static final String INITIAL_REFRESH_DELAY_SECS =
"hbase.client.bootstrap.initial_refresh_delay_secs";
public static final String PERIODIC_REFRESH_INTERVAL_SECS = public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.bootstrap.refresh_interval_secs"; "hbase.client.bootstrap.refresh_interval_secs";
@ -62,7 +73,8 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
private static final char ADDRS_CONF_SEPARATOR = ','; private static final char ADDRS_CONF_SEPARATOR = ',';
RpcConnectionRegistry(Configuration conf) throws IOException { RpcConnectionRegistry(Configuration conf) throws IOException {
super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES); super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
MIN_SECS_BETWEEN_REFRESHES);
} }
@Override @Override

View File

@ -17,10 +17,10 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -46,6 +46,8 @@ public class TestRegistryEndpointsRefresher {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class); HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
"hbase.test.registry.initial.delay.secs";
private static final String INTERVAL_SECS_CONFIG_NAME = private static final String INTERVAL_SECS_CONFIG_NAME =
"hbase.test.registry.refresh.interval.secs"; "hbase.test.registry.refresh.interval.secs";
private static final String MIN_INTERVAL_SECS_CONFIG_NAME = private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
@ -75,33 +77,45 @@ public class TestRegistryEndpointsRefresher {
callTimestamps.add(EnvironmentEdgeManager.currentTime()); callTimestamps.add(EnvironmentEdgeManager.currentTime());
} }
private void createRefresher(long intervalSecs, long minIntervalSecs) { private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) {
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs);
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs); conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs); conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME, refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh); INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
} }
@Test @Test
public void testDisableRefresh() { public void testDisableRefresh() {
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1); conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME, assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh)); INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
} }
@Test @Test
public void testPeriodicEndpointRefresh() throws IOException { public void testInitialDelay() throws InterruptedException {
createRefresher(1, 10, 0);
// Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1
// seconds
Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1);
// Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds
Thread.sleep(5000);
assertEquals(1, refreshCallCounter.get());
}
@Test
public void testPeriodicMasterEndPointRefresh() {
// Refresh every 1 second. // Refresh every 1 second.
createRefresher(1, 0); createRefresher(1, 1, 0);
// Wait for > 3 seconds to see that at least 3 refresh have been made. // Wait for > 3 seconds to see that at least 3 refresh have been made.
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3); Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
} }
@Test @Test
public void testDurationBetweenRefreshes() throws IOException { public void testDurationBetweenRefreshes() {
// Disable periodic refresh // Disable periodic refresh
// A minimum duration of 1s between refreshes // A minimum duration of 1s between refreshes
createRefresher(Integer.MAX_VALUE, 1); createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1);
// Issue a ton of manual refreshes. // Issue a ton of manual refreshes.
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
refresher.refreshNow(); refresher.refreshNow();

View File

@ -69,6 +69,8 @@ public class TestRpcBasedRegistryHedgedReads {
private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class); private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout"; private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
"hbase.test.refresh.initial.delay.secs";
private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME = private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
"hbase.test.refresh.interval.secs"; "hbase.test.refresh.interval.secs";
private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME = private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
@ -153,7 +155,8 @@ public class TestRpcBasedRegistryHedgedReads {
Configuration conf = UTIL.getConfiguration(); Configuration conf = UTIL.getConfiguration();
conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged); conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME, return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) { INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME,
MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
@Override @Override
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException { protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
@ -173,6 +176,7 @@ public class TestRpcBasedRegistryHedgedReads {
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class, conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
RpcClient.class); RpcClient.class);
// disable refresh, we do not need to refresh in this test // disable refresh, we do not need to refresh in this test
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE); conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1); conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
BOOTSTRAP_NODES = IntStream.range(0, 10) BOOTSTRAP_NODES = IntStream.range(0, 10)

View File

@ -58,6 +58,7 @@ public class TestRpcConnectionRegistry {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
// allow refresh immediately so we will switch to use region servers soon. // allow refresh immediately so we will switch to use region servers soon.
UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1); UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0); UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
UTIL.startMiniCluster(3); UTIL.startMiniCluster(3);
@ -91,8 +92,8 @@ public class TestRpcConnectionRegistry {
@Test @Test
public void testRegistryRPCs() throws Exception { public void testRegistryRPCs() throws Exception {
HMaster activeMaster = UTIL.getHBaseCluster().getMaster(); HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
// wait until we switch to use region servers // sleep 3 seconds, since our initial delay is 1 second, we should have refreshed the endpoints
UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3); Thread.sleep(3000);
assertThat(registry.getParsedServers(), assertThat(registry.getParsedServers(),
hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0]))); hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));