HBASE-26182 Allow disabling refresh of connection registry endpoint (#3605)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Duo Zhang 2021-08-24 14:04:05 +08:00 committed by Duo Zhang
parent 2728821e62
commit d8fb978bef
3 changed files with 42 additions and 24 deletions

View File

@ -100,9 +100,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
rpcClient = RpcClientFactory.createClient(conf, null); rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf); rpcControllerFactory = RpcControllerFactory.instantiate(conf);
populateStubs(getBootstrapNodes(conf)); populateStubs(getBootstrapNodes(conf));
registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName, // could return null here is refresh interval is less than zero
minRefreshIntervalSecsConfigName, this::refreshStubs); registryEndpointRefresher = RegistryEndpointsRefresher.create(conf,
registryEndpointRefresher.start(); refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
} }
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException; protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;

View File

@ -35,7 +35,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
* {@code minIntervalSecsConfigName} seconds apart. * {@code minIntervalSecsConfigName} seconds apart.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class RegistryEndpointsRefresher { final class RegistryEndpointsRefresher {
private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class); private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
@ -51,11 +51,7 @@ class RegistryEndpointsRefresher {
private boolean refreshNow = false; private boolean refreshNow = false;
private boolean stopped = false; private boolean stopped = false;
public void start() { synchronized void stop() {
thread.start();
}
public synchronized void stop() {
stopped = true; stopped = true;
notifyAll(); notifyAll();
} }
@ -108,18 +104,15 @@ class RegistryEndpointsRefresher {
void refresh() throws IOException; void refresh() throws IOException;
} }
RegistryEndpointsRefresher(Configuration conf, String intervalSecsConfigName, private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs,
String minIntervalSecsConfigName, Refresher refresher) { Refresher refresher) {
periodicRefreshMs = TimeUnit.SECONDS this.periodicRefreshMs = periodicRefreshMs;
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
minTimeBetweenRefreshesMs = TimeUnit.SECONDS this.refresher = refresher;
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(periodicRefreshMs > 0);
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
thread = new Thread(this::mainLoop); thread = new Thread(this::mainLoop);
thread.setName("Registry-endpoints-refresh-end-points"); thread.setName("Registry-endpoints-refresh-end-points");
thread.setDaemon(true); thread.setDaemon(true);
this.refresher = refresher; thread.start();
} }
/** /**
@ -130,4 +123,22 @@ class RegistryEndpointsRefresher {
refreshNow = true; refreshNow = true;
notifyAll(); notifyAll();
} }
/**
* Create a {@link RegistryEndpointsRefresher}. If the interval secs configured via
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
* refreshing of endpoints.
*/
static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName,
String minIntervalSecsConfigName, Refresher refresher) {
long periodicRefreshMs = TimeUnit.SECONDS
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
if (periodicRefreshMs <= 0) {
return null;
}
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher);
}
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
@ -74,18 +75,24 @@ public class TestRegistryEndpointsRefresher {
callTimestamps.add(EnvironmentEdgeManager.currentTime()); callTimestamps.add(EnvironmentEdgeManager.currentTime());
} }
private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) { private void createRefresher(long intervalSecs, long minIntervalSecs) {
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs); conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs); conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
refresher = new RegistryEndpointsRefresher(conf, INTERVAL_SECS_CONFIG_NAME, refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh); MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
refresher.start();
} }
@Test @Test
public void testPeriodicMasterEndPointRefresh() throws IOException { public void testDisableRefresh() {
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
}
@Test
public void testPeriodicEndpointRefresh() throws IOException {
// Refresh every 1 second. // Refresh every 1 second.
createAndStartRefresher(1, 0); createRefresher(1, 0);
// Wait for > 3 seconds to see that at least 3 refresh have been made. // Wait for > 3 seconds to see that at least 3 refresh have been made.
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3); Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
} }
@ -94,7 +101,7 @@ public class TestRegistryEndpointsRefresher {
public void testDurationBetweenRefreshes() throws IOException { public void testDurationBetweenRefreshes() throws IOException {
// Disable periodic refresh // Disable periodic refresh
// A minimum duration of 1s between refreshes // A minimum duration of 1s between refreshes
createAndStartRefresher(Integer.MAX_VALUE, 1); createRefresher(Integer.MAX_VALUE, 1);
// Issue a ton of manual refreshes. // Issue a ton of manual refreshes.
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
refresher.refreshNow(); refresher.refreshNow();