HBASE-26182 Allow disabling refresh of connection registry endpoint (#3605)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
6b5bd75e46
commit
6ed03d98ef
|
@ -102,9 +102,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
|
||||||
rpcClient = RpcClientFactory.createClient(conf, null);
|
rpcClient = RpcClientFactory.createClient(conf, null);
|
||||||
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||||
populateStubs(getBootstrapNodes(conf));
|
populateStubs(getBootstrapNodes(conf));
|
||||||
registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName,
|
// could return null here is refresh interval is less than zero
|
||||||
minRefreshIntervalSecsConfigName, this::refreshStubs);
|
registryEndpointRefresher = RegistryEndpointsRefresher.create(conf,
|
||||||
registryEndpointRefresher.start();
|
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
|
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
|
||||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
* {@code minIntervalSecsConfigName} seconds apart.
|
* {@code minIntervalSecsConfigName} seconds apart.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class RegistryEndpointsRefresher {
|
final class RegistryEndpointsRefresher {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
|
||||||
|
|
||||||
|
@ -51,11 +51,7 @@ class RegistryEndpointsRefresher {
|
||||||
private boolean refreshNow = false;
|
private boolean refreshNow = false;
|
||||||
private boolean stopped = false;
|
private boolean stopped = false;
|
||||||
|
|
||||||
public void start() {
|
synchronized void stop() {
|
||||||
thread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void stop() {
|
|
||||||
stopped = true;
|
stopped = true;
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
|
@ -108,18 +104,15 @@ class RegistryEndpointsRefresher {
|
||||||
void refresh() throws IOException;
|
void refresh() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
RegistryEndpointsRefresher(Configuration conf, String intervalSecsConfigName,
|
private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs,
|
||||||
String minIntervalSecsConfigName, Refresher refresher) {
|
Refresher refresher) {
|
||||||
periodicRefreshMs = TimeUnit.SECONDS
|
this.periodicRefreshMs = periodicRefreshMs;
|
||||||
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
|
this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
|
||||||
minTimeBetweenRefreshesMs = TimeUnit.SECONDS
|
this.refresher = refresher;
|
||||||
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
|
|
||||||
Preconditions.checkArgument(periodicRefreshMs > 0);
|
|
||||||
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
|
|
||||||
thread = new Thread(this::mainLoop);
|
thread = new Thread(this::mainLoop);
|
||||||
thread.setName("Registry-endpoints-refresh-end-points");
|
thread.setName("Registry-endpoints-refresh-end-points");
|
||||||
thread.setDaemon(true);
|
thread.setDaemon(true);
|
||||||
this.refresher = refresher;
|
thread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -130,4 +123,22 @@ class RegistryEndpointsRefresher {
|
||||||
refreshNow = true;
|
refreshNow = true;
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link RegistryEndpointsRefresher}. If the interval secs configured via
|
||||||
|
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
|
||||||
|
* refreshing of endpoints.
|
||||||
|
*/
|
||||||
|
static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName,
|
||||||
|
String minIntervalSecsConfigName, Refresher refresher) {
|
||||||
|
long periodicRefreshMs = TimeUnit.SECONDS
|
||||||
|
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
|
||||||
|
if (periodicRefreshMs <= 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
|
||||||
|
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
|
||||||
|
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
|
||||||
|
return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -74,18 +75,24 @@ public class TestRegistryEndpointsRefresher {
|
||||||
callTimestamps.add(EnvironmentEdgeManager.currentTime());
|
callTimestamps.add(EnvironmentEdgeManager.currentTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) {
|
private void createRefresher(long intervalSecs, long minIntervalSecs) {
|
||||||
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
|
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
|
||||||
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
|
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
|
||||||
refresher = new RegistryEndpointsRefresher(conf, INTERVAL_SECS_CONFIG_NAME,
|
refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
|
||||||
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
|
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
|
||||||
refresher.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPeriodicMasterEndPointRefresh() throws IOException {
|
public void testDisableRefresh() {
|
||||||
|
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
|
||||||
|
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
|
||||||
|
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPeriodicEndpointRefresh() throws IOException {
|
||||||
// Refresh every 1 second.
|
// Refresh every 1 second.
|
||||||
createAndStartRefresher(1, 0);
|
createRefresher(1, 0);
|
||||||
// Wait for > 3 seconds to see that at least 3 refresh have been made.
|
// Wait for > 3 seconds to see that at least 3 refresh have been made.
|
||||||
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
|
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
|
||||||
}
|
}
|
||||||
|
@ -94,7 +101,7 @@ public class TestRegistryEndpointsRefresher {
|
||||||
public void testDurationBetweenRefreshes() throws IOException {
|
public void testDurationBetweenRefreshes() throws IOException {
|
||||||
// Disable periodic refresh
|
// Disable periodic refresh
|
||||||
// A minimum duration of 1s between refreshes
|
// A minimum duration of 1s between refreshes
|
||||||
createAndStartRefresher(Integer.MAX_VALUE, 1);
|
createRefresher(Integer.MAX_VALUE, 1);
|
||||||
// Issue a ton of manual refreshes.
|
// Issue a ton of manual refreshes.
|
||||||
for (int i = 0; i < 10000; i++) {
|
for (int i = 0; i < 10000; i++) {
|
||||||
refresher.refreshNow();
|
refresher.refreshNow();
|
||||||
|
|
Loading…
Reference in New Issue