HBASE-26180 Introduce a initial refresh interval for RpcConnectionRegistry (#3601)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
parent
dacd3404a4
commit
931a7a5a61
|
@ -88,8 +88,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
|
||||||
private final RegistryEndpointsRefresher registryEndpointRefresher;
|
private final RegistryEndpointsRefresher registryEndpointRefresher;
|
||||||
|
|
||||||
protected AbstractRpcBasedConnectionRegistry(Configuration conf,
|
protected AbstractRpcBasedConnectionRegistry(Configuration conf,
|
||||||
String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
|
String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
|
||||||
String minRefreshIntervalSecsConfigName) throws IOException {
|
String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
|
||||||
|
throws IOException {
|
||||||
this.hedgedReadFanOut =
|
this.hedgedReadFanOut =
|
||||||
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
|
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
|
||||||
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
|
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
|
||||||
|
@ -101,8 +102,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
|
||||||
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||||
populateStubs(getBootstrapNodes(conf));
|
populateStubs(getBootstrapNodes(conf));
|
||||||
// could return null here is refresh interval is less than zero
|
// could return null here is refresh interval is less than zero
|
||||||
registryEndpointRefresher = RegistryEndpointsRefresher.create(conf,
|
registryEndpointRefresher =
|
||||||
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
|
RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
|
||||||
|
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
|
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
|
||||||
|
|
|
@ -59,6 +59,9 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
|
||||||
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
|
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
|
||||||
"hbase.client.master_registry.hedged.fanout";
|
"hbase.client.master_registry.hedged.fanout";
|
||||||
|
|
||||||
|
public static final String MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS =
|
||||||
|
"hbase.client.master_registry.initial_refresh_delay_secs";
|
||||||
|
|
||||||
public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
|
public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
|
||||||
"hbase.client.master_registry.refresh_interval_secs";
|
"hbase.client.master_registry.refresh_interval_secs";
|
||||||
|
|
||||||
|
@ -85,7 +88,7 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
MasterRegistry(Configuration conf) throws IOException {
|
MasterRegistry(Configuration conf) throws IOException {
|
||||||
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
|
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
|
||||||
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
|
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@ final class RegistryEndpointsRefresher {
|
||||||
|
|
||||||
private final Thread thread;
|
private final Thread thread;
|
||||||
private final Refresher refresher;
|
private final Refresher refresher;
|
||||||
|
private final long initialDelayMs;
|
||||||
private final long periodicRefreshMs;
|
private final long periodicRefreshMs;
|
||||||
private final long minTimeBetweenRefreshesMs;
|
private final long minTimeBetweenRefreshesMs;
|
||||||
|
|
||||||
|
@ -56,9 +57,20 @@ final class RegistryEndpointsRefresher {
|
||||||
notifyAll();
|
notifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getRefreshIntervalMs(boolean firstRefresh) {
|
||||||
|
if (refreshNow) {
|
||||||
|
return minTimeBetweenRefreshesMs;
|
||||||
|
}
|
||||||
|
if (firstRefresh) {
|
||||||
|
return initialDelayMs;
|
||||||
|
}
|
||||||
|
return periodicRefreshMs;
|
||||||
|
}
|
||||||
|
|
||||||
// The main loop for the refresh thread.
|
// The main loop for the refresh thread.
|
||||||
private void mainLoop() {
|
private void mainLoop() {
|
||||||
long lastRefreshTime = EnvironmentEdgeManager.currentTime();
|
long lastRefreshTime = EnvironmentEdgeManager.currentTime();
|
||||||
|
boolean firstRefresh = true;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
@ -68,9 +80,12 @@ final class RegistryEndpointsRefresher {
|
||||||
}
|
}
|
||||||
// if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
|
// if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
|
||||||
// otherwise wait until periodicRefreshMs elapsed
|
// otherwise wait until periodicRefreshMs elapsed
|
||||||
long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
|
long waitTime = getRefreshIntervalMs(firstRefresh) -
|
||||||
(EnvironmentEdgeManager.currentTime() - lastRefreshTime);
|
(EnvironmentEdgeManager.currentTime() - lastRefreshTime);
|
||||||
if (waitTime <= 0) {
|
if (waitTime <= 0) {
|
||||||
|
// we are going to refresh, reset this flag
|
||||||
|
firstRefresh = false;
|
||||||
|
refreshNow = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -81,8 +96,6 @@ final class RegistryEndpointsRefresher {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// we are going to refresh, reset this flag
|
|
||||||
refreshNow = false;
|
|
||||||
}
|
}
|
||||||
LOG.debug("Attempting to refresh registry end points");
|
LOG.debug("Attempting to refresh registry end points");
|
||||||
try {
|
try {
|
||||||
|
@ -104,8 +117,9 @@ final class RegistryEndpointsRefresher {
|
||||||
void refresh() throws IOException;
|
void refresh() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs,
|
private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs,
|
||||||
Refresher refresher) {
|
long minTimeBetweenRefreshesMs, Refresher refresher) {
|
||||||
|
this.initialDelayMs = initialDelayMs;
|
||||||
this.periodicRefreshMs = periodicRefreshMs;
|
this.periodicRefreshMs = periodicRefreshMs;
|
||||||
this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
|
this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
|
||||||
this.refresher = refresher;
|
this.refresher = refresher;
|
||||||
|
@ -129,16 +143,19 @@ final class RegistryEndpointsRefresher {
|
||||||
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
|
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
|
||||||
* refreshing of endpoints.
|
* refreshing of endpoints.
|
||||||
*/
|
*/
|
||||||
static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName,
|
static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName,
|
||||||
String minIntervalSecsConfigName, Refresher refresher) {
|
String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) {
|
||||||
long periodicRefreshMs = TimeUnit.SECONDS
|
long periodicRefreshMs = TimeUnit.SECONDS
|
||||||
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
|
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
|
||||||
if (periodicRefreshMs <= 0) {
|
if (periodicRefreshMs <= 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
long initialDelayMs = Math.max(1,
|
||||||
|
TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10)));
|
||||||
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
|
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
|
||||||
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
|
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
|
||||||
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
|
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
|
||||||
return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher);
|
return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs,
|
||||||
|
minTimeBetweenRefreshesMs, refresher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,17 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
|
||||||
/** Configuration key that controls the fan out of requests **/
|
/** Configuration key that controls the fan out of requests **/
|
||||||
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";
|
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is
|
||||||
|
* possible that different end users will configure the same machine which makes the machine over
|
||||||
|
* load. So we should have a shorter delay for the initial refresh, to let users quickly switch to
|
||||||
|
* the bootstrap nodes we want them to connect to.
|
||||||
|
* <p/>
|
||||||
|
* The default value for initial refresh delay is 1/10 of periodic refresh interval.
|
||||||
|
*/
|
||||||
|
public static final String INITIAL_REFRESH_DELAY_SECS =
|
||||||
|
"hbase.client.bootstrap.initial_refresh_delay_secs";
|
||||||
|
|
||||||
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
|
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
|
||||||
"hbase.client.bootstrap.refresh_interval_secs";
|
"hbase.client.bootstrap.refresh_interval_secs";
|
||||||
|
|
||||||
|
@ -62,7 +73,8 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
|
||||||
private static final char ADDRS_CONF_SEPARATOR = ',';
|
private static final char ADDRS_CONF_SEPARATOR = ',';
|
||||||
|
|
||||||
RpcConnectionRegistry(Configuration conf) throws IOException {
|
RpcConnectionRegistry(Configuration conf) throws IOException {
|
||||||
super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES);
|
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
|
||||||
|
MIN_SECS_BETWEEN_REFRESHES);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -17,10 +17,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -46,6 +46,8 @@ public class TestRegistryEndpointsRefresher {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
|
HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
|
||||||
|
|
||||||
|
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
|
||||||
|
"hbase.test.registry.initial.delay.secs";
|
||||||
private static final String INTERVAL_SECS_CONFIG_NAME =
|
private static final String INTERVAL_SECS_CONFIG_NAME =
|
||||||
"hbase.test.registry.refresh.interval.secs";
|
"hbase.test.registry.refresh.interval.secs";
|
||||||
private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
|
private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
|
||||||
|
@ -75,33 +77,45 @@ public class TestRegistryEndpointsRefresher {
|
||||||
callTimestamps.add(EnvironmentEdgeManager.currentTime());
|
callTimestamps.add(EnvironmentEdgeManager.currentTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createRefresher(long intervalSecs, long minIntervalSecs) {
|
private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) {
|
||||||
|
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs);
|
||||||
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
|
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
|
||||||
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
|
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
|
||||||
refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
|
refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME,
|
||||||
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
|
INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDisableRefresh() {
|
public void testDisableRefresh() {
|
||||||
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
|
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
|
||||||
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
|
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
|
||||||
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
|
INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPeriodicEndpointRefresh() throws IOException {
|
public void testInitialDelay() throws InterruptedException {
|
||||||
|
createRefresher(1, 10, 0);
|
||||||
|
// Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1
|
||||||
|
// seconds
|
||||||
|
Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1);
|
||||||
|
// Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds
|
||||||
|
Thread.sleep(5000);
|
||||||
|
assertEquals(1, refreshCallCounter.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPeriodicMasterEndPointRefresh() {
|
||||||
// Refresh every 1 second.
|
// Refresh every 1 second.
|
||||||
createRefresher(1, 0);
|
createRefresher(1, 1, 0);
|
||||||
// Wait for > 3 seconds to see that at least 3 refresh have been made.
|
// Wait for > 3 seconds to see that at least 3 refresh have been made.
|
||||||
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
|
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDurationBetweenRefreshes() throws IOException {
|
public void testDurationBetweenRefreshes() {
|
||||||
// Disable periodic refresh
|
// Disable periodic refresh
|
||||||
// A minimum duration of 1s between refreshes
|
// A minimum duration of 1s between refreshes
|
||||||
createRefresher(Integer.MAX_VALUE, 1);
|
createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1);
|
||||||
// Issue a ton of manual refreshes.
|
// Issue a ton of manual refreshes.
|
||||||
for (int i = 0; i < 10000; i++) {
|
for (int i = 0; i < 10000; i++) {
|
||||||
refresher.refreshNow();
|
refresher.refreshNow();
|
||||||
|
|
|
@ -69,6 +69,8 @@ public class TestRpcBasedRegistryHedgedReads {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
|
||||||
|
|
||||||
private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
|
private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
|
||||||
|
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
|
||||||
|
"hbase.test.refresh.initial.delay.secs";
|
||||||
private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
|
private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
|
||||||
"hbase.test.refresh.interval.secs";
|
"hbase.test.refresh.interval.secs";
|
||||||
private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
|
private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
|
||||||
|
@ -153,7 +155,8 @@ public class TestRpcBasedRegistryHedgedReads {
|
||||||
Configuration conf = UTIL.getConfiguration();
|
Configuration conf = UTIL.getConfiguration();
|
||||||
conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
|
conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
|
||||||
return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
|
return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
|
||||||
REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
|
INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME,
|
||||||
|
MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
|
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
|
||||||
|
@ -173,6 +176,7 @@ public class TestRpcBasedRegistryHedgedReads {
|
||||||
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
|
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
|
||||||
RpcClient.class);
|
RpcClient.class);
|
||||||
// disable refresh, we do not need to refresh in this test
|
// disable refresh, we do not need to refresh in this test
|
||||||
|
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
|
||||||
conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
|
conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
|
||||||
conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
|
conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
|
||||||
BOOTSTRAP_NODES = IntStream.range(0, 10)
|
BOOTSTRAP_NODES = IntStream.range(0, 10)
|
||||||
|
|
|
@ -58,6 +58,7 @@ public class TestRpcConnectionRegistry {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
// allow refresh immediately so we will switch to use region servers soon.
|
// allow refresh immediately so we will switch to use region servers soon.
|
||||||
|
UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
|
||||||
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
|
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
|
||||||
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
|
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
|
||||||
UTIL.startMiniCluster(3);
|
UTIL.startMiniCluster(3);
|
||||||
|
@ -91,8 +92,8 @@ public class TestRpcConnectionRegistry {
|
||||||
@Test
|
@Test
|
||||||
public void testRegistryRPCs() throws Exception {
|
public void testRegistryRPCs() throws Exception {
|
||||||
HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
|
HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
|
||||||
// wait until we switch to use region servers
|
// sleep 3 seconds, since our initial delay is 1 second, we should have refreshed the endpoints
|
||||||
UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3);
|
Thread.sleep(3000);
|
||||||
assertThat(registry.getParsedServers(),
|
assertThat(registry.getParsedServers(),
|
||||||
hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));
|
hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue