HBASE-26172 Deprecated MasterRegistry (#3566)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org> Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
f2e2140b57
commit
c8d9d4df80
|
@ -49,8 +49,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMaste
|
||||||
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
|
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
|
||||||
* it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
|
* it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
|
||||||
* <p/>
|
* <p/>
|
||||||
* TODO: Handle changes to the configuration dynamically without having to restart the client.
|
* @deprecated Since 2.5.0, will be removed in 4.0.0. Use {@link RpcConnectionRegistry} instead.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
|
public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
|
||||||
|
|
||||||
|
|
|
@ -39,12 +39,8 @@ class RegistryEndpointsRefresher {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
|
||||||
|
|
||||||
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
|
|
||||||
"hbase.client.rpc_registry.refresh_interval_secs";
|
|
||||||
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
|
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
|
||||||
|
|
||||||
public static final String MIN_SECS_BETWEEN_REFRESHES =
|
|
||||||
"hbase.client.rpc_registry.min_secs_between_refreshes";
|
|
||||||
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
|
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
|
||||||
|
|
||||||
private final Thread thread;
|
private final Thread thread;
|
||||||
|
|
|
@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBoots
|
||||||
public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
|
public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
|
||||||
|
|
||||||
/** Configuration key that controls the fan out of requests **/
|
/** Configuration key that controls the fan out of requests **/
|
||||||
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.rpc_registry.hedged.fanout";
|
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";
|
||||||
|
|
||||||
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
|
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
|
||||||
"hbase.client.bootstrap.refresh_interval_secs";
|
"hbase.client.bootstrap.refresh_interval_secs";
|
||||||
|
|
|
@ -52,13 +52,13 @@ public class TestRegistryEndpointsRefresher {
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private RegistryEndpointsRefresher refresher;
|
private RegistryEndpointsRefresher refresher;
|
||||||
private AtomicInteger getMastersCallCounter;
|
private AtomicInteger refreshCallCounter;
|
||||||
private CopyOnWriteArrayList<Long> callTimestamps;
|
private CopyOnWriteArrayList<Long> callTimestamps;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
conf = HBaseConfiguration.create();
|
conf = HBaseConfiguration.create();
|
||||||
getMastersCallCounter = new AtomicInteger(0);
|
refreshCallCounter = new AtomicInteger(0);
|
||||||
callTimestamps = new CopyOnWriteArrayList<>();
|
callTimestamps = new CopyOnWriteArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,7 +70,7 @@ public class TestRegistryEndpointsRefresher {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void refresh() {
|
private void refresh() {
|
||||||
getMastersCallCounter.incrementAndGet();
|
refreshCallCounter.incrementAndGet();
|
||||||
callTimestamps.add(EnvironmentEdgeManager.currentTime());
|
callTimestamps.add(EnvironmentEdgeManager.currentTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,8 +86,8 @@ public class TestRegistryEndpointsRefresher {
|
||||||
public void testPeriodicMasterEndPointRefresh() throws IOException {
|
public void testPeriodicMasterEndPointRefresh() throws IOException {
|
||||||
// Refresh every 1 second.
|
// Refresh every 1 second.
|
||||||
createAndStartRefresher(1, 0);
|
createAndStartRefresher(1, 0);
|
||||||
// Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
|
// Wait for > 3 seconds to see that at least 3 refresh have been made.
|
||||||
Waiter.waitFor(conf, 5000, () -> getMastersCallCounter.get() > 3);
|
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -101,10 +101,10 @@ public class TestRegistryEndpointsRefresher {
|
||||||
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
|
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
// Overall wait time is 10000 ms, so the number of requests should be <=10
|
// Overall wait time is 10000 ms, so the number of requests should be <=10
|
||||||
// Actual calls to getMasters() should be much lower than the refresh count.
|
// Actual calls to refresh should be much lower than the refresh count.
|
||||||
assertTrue(String.valueOf(getMastersCallCounter.get()), getMastersCallCounter.get() <= 20);
|
assertTrue(String.valueOf(refreshCallCounter.get()), refreshCallCounter.get() <= 20);
|
||||||
assertTrue(callTimestamps.size() > 0);
|
assertTrue(callTimestamps.size() > 0);
|
||||||
// Verify that the delta between subsequent RPCs is at least 1sec as configured.
|
// Verify that the delta between subsequent refresh is at least 1sec as configured.
|
||||||
for (int i = 1; i < callTimestamps.size() - 1; i++) {
|
for (int i = 1; i < callTimestamps.size() - 1; i++) {
|
||||||
long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
|
long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
|
||||||
// Few ms cushion to account for any env jitter.
|
// Few ms cushion to account for any env jitter.
|
|
@ -2700,10 +2700,21 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ServerName> getBackupMasters() {
|
@Override
|
||||||
|
public Optional<ServerName> getActiveMaster() {
|
||||||
|
return activeMasterManager.getActiveMasterServerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ServerName> getBackupMasters() {
|
||||||
return activeMasterManager.getBackupMasters();
|
return activeMasterManager.getBackupMasters();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ServerName> getRegionServers() {
|
||||||
|
return serverManager.getOnlineServersList();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The set of loaded coprocessors is stored in a static set. Since it's
|
* The set of loaded coprocessors is stored in a static set. Since it's
|
||||||
* statically allocated, it does not require that HMaster's cpHost be
|
* statically allocated, it does not require that HMaster's cpHost be
|
||||||
|
@ -3848,10 +3859,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
return cachedClusterId.getFromCacheOrFetch();
|
return cachedClusterId.getFromCacheOrFetch();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<ServerName> getActiveMaster() {
|
|
||||||
return activeMasterManager.getActiveMasterServerName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runReplicationBarrierCleaner() {
|
public void runReplicationBarrierCleaner() {
|
||||||
ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
|
ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
|
||||||
|
|
|
@ -33,7 +33,6 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -372,15 +371,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
|
||||||
|
@ -408,8 +398,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public class MasterRpcServices extends RSRpcServices implements
|
public class MasterRpcServices extends RSRpcServices implements
|
||||||
MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
|
MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
|
||||||
LockService.BlockingInterface, HbckService.BlockingInterface,
|
LockService.BlockingInterface, HbckService.BlockingInterface {
|
||||||
ClientMetaService.BlockingInterface {
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
|
private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
|
||||||
private static final Logger AUDITLOG =
|
private static final Logger AUDITLOG =
|
||||||
|
@ -3003,58 +2992,6 @@ public class MasterRpcServices extends RSRpcServices implements
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Override this method since for backup master we will not set the clusterId field, which means
|
|
||||||
// we need to find another way to get cluster id for backup masters.
|
|
||||||
@Override
|
|
||||||
public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request)
|
|
||||||
throws ServiceException {
|
|
||||||
GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder();
|
|
||||||
String clusterId = master.getClusterId();
|
|
||||||
if (clusterId != null) {
|
|
||||||
resp.setClusterId(clusterId);
|
|
||||||
}
|
|
||||||
return resp.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Override this method since we use ActiveMasterManager to get active master on HMaster while in
|
|
||||||
// HRegionServer we use MasterAddressTracker
|
|
||||||
@Override
|
|
||||||
public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
|
|
||||||
GetActiveMasterRequest request) throws ServiceException {
|
|
||||||
GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
|
|
||||||
Optional<ServerName> serverName = master.getActiveMaster();
|
|
||||||
serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name)));
|
|
||||||
return resp.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Override this method since we use ActiveMasterManager to get backup masters on HMaster while in
|
|
||||||
// HRegionServer we use MasterAddressTracker
|
|
||||||
@Override
|
|
||||||
public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
|
|
||||||
throws ServiceException {
|
|
||||||
GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
|
|
||||||
// Active master
|
|
||||||
Optional<ServerName> serverName = master.getActiveMaster();
|
|
||||||
serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
|
|
||||||
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
|
|
||||||
// Backup masters
|
|
||||||
for (ServerName backupMaster : master.getBackupMasters()) {
|
|
||||||
resp.addMasterServers(GetMastersResponseEntry.newBuilder()
|
|
||||||
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
|
|
||||||
}
|
|
||||||
return resp.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
|
|
||||||
GetBootstrapNodesRequest request) throws ServiceException {
|
|
||||||
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
|
|
||||||
for (ServerName sn : master.getServerManager().getOnlineServers().keySet()) {
|
|
||||||
builder.addServerName(ProtobufUtil.toServerName(sn));
|
|
||||||
}
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetRSGroupInfoResponse getRSGroupInfo(RpcController controller,
|
public GetRSGroupInfoResponse getRSGroupInfo(RpcController controller,
|
||||||
GetRSGroupInfoRequest request) throws ServiceException {
|
GetRSGroupInfoRequest request) throws ServiceException {
|
||||||
|
|
|
@ -4007,11 +4007,19 @@ public class HRegionServer extends Thread implements
|
||||||
return this.retryPauseTime;
|
return this.retryPauseTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Optional<ServerName> getActiveMaster() {
|
||||||
|
return Optional.ofNullable(masterAddressTracker.getMasterAddress());
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ServerName> getBackupMasters() {
|
||||||
|
return masterAddressTracker.getBackupMasters();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ServerName> getRegionServers() {
|
||||||
|
return regionServerAddressTracker.getRegionServers();
|
||||||
|
}
|
||||||
|
|
||||||
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
||||||
return this.metaRegionLocationCache;
|
return this.metaRegionLocationCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
RegionServerAddressTracker getRegionServerAddressTracker() {
|
|
||||||
return regionServerAddressTracker;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,14 +308,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
*/
|
*/
|
||||||
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
|
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Whether to reject rows with size > threshold defined by
|
* Whether to reject rows with size > threshold defined by
|
||||||
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
|
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
|
||||||
*/
|
*/
|
||||||
private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
|
private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
|
||||||
"hbase.rpc.rows.size.threshold.reject";
|
"hbase.rpc.rows.size.threshold.reject";
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
|
* Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
|
||||||
*/
|
*/
|
||||||
private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
|
private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
|
||||||
|
@ -4092,31 +4092,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
public GetActiveMasterResponse getActiveMaster(RpcController controller,
|
public GetActiveMasterResponse getActiveMaster(RpcController controller,
|
||||||
GetActiveMasterRequest request) throws ServiceException {
|
GetActiveMasterRequest request) throws ServiceException {
|
||||||
GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
|
GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
|
||||||
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
|
regionServer.getActiveMaster()
|
||||||
if (activeMaster != null) {
|
.ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name)));
|
||||||
builder.setServerName(ProtobufUtil.toServerName(activeMaster));
|
|
||||||
}
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
|
public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
|
||||||
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
|
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
|
||||||
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
|
regionServer.getActiveMaster()
|
||||||
if (activeMaster != null) {
|
.ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
|
||||||
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
|
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
|
||||||
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true));
|
regionServer.getBackupMasters()
|
||||||
}
|
.forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
|
||||||
for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) {
|
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
|
||||||
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
|
|
||||||
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false));
|
|
||||||
}
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ServiceException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -4131,11 +4122,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
|
public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
|
||||||
GetBootstrapNodesRequest request) throws ServiceException {
|
GetBootstrapNodesRequest request) throws ServiceException {
|
||||||
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
|
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
|
||||||
regionServer.getRegionServerAddressTracker().getRegionServers().stream()
|
regionServer.getRegionServers().stream().map(ProtobufUtil::toServerName)
|
||||||
.map(ProtobufUtil::toServerName).forEach(builder::addServerName);
|
.forEach(builder::addServerName);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,12 @@
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
@ -45,6 +43,8 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
@Category({MasterTests.class, MediumTests.class})
|
@Category({MasterTests.class, MediumTests.class})
|
||||||
public class TestSplitRegionWhileRSCrash {
|
public class TestSplitRegionWhileRSCrash {
|
||||||
|
|
||||||
|
@ -52,45 +52,40 @@ public class TestSplitRegionWhileRSCrash {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestSplitRegionWhileRSCrash.class);
|
HBaseClassTestRule.forClass(TestSplitRegionWhileRSCrash.class);
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory.getLogger(TestSplitRegionWhileRSCrash.class);
|
||||||
.getLogger(TestSplitRegionWhileRSCrash.class);
|
|
||||||
|
|
||||||
protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
||||||
private static TableName TABLE_NAME = TableName.valueOf("test");
|
private static TableName TABLE_NAME = TableName.valueOf("test");
|
||||||
private static Admin admin;
|
private static Admin ADMIN;
|
||||||
private static byte[] CF = Bytes.toBytes("cf");
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
private static CountDownLatch mergeCommitArrive = new CountDownLatch(1);
|
|
||||||
private static Table TABLE;
|
private static Table TABLE;
|
||||||
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupCluster() throws Exception {
|
public static void setupCluster() throws Exception {
|
||||||
UTIL.startMiniCluster(1);
|
UTIL.startMiniCluster(1);
|
||||||
admin = UTIL.getAdmin();
|
ADMIN = UTIL.getAdmin();
|
||||||
TABLE = UTIL.createTable(TABLE_NAME, CF);
|
TABLE = UTIL.createTable(TABLE_NAME, CF);
|
||||||
UTIL.waitTableAvailable(TABLE_NAME);
|
UTIL.waitTableAvailable(TABLE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void cleanupTest() throws Exception {
|
public static void cleanupTest() throws Exception {
|
||||||
try {
|
Closeables.close(TABLE, true);
|
||||||
UTIL.shutdownMiniCluster();
|
UTIL.shutdownMiniCluster();
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("failure shutting down cluster", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws Exception {
|
public void test() throws Exception {
|
||||||
MasterProcedureEnv env = UTIL.getMiniHBaseCluster().getMaster()
|
MasterProcedureEnv env =
|
||||||
.getMasterProcedureExecutor().getEnvironment();
|
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment();
|
||||||
final ProcedureExecutor<MasterProcedureEnv> executor = UTIL.getMiniHBaseCluster()
|
final ProcedureExecutor<MasterProcedureEnv> executor =
|
||||||
.getMaster().getMasterProcedureExecutor();
|
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||||
List<RegionInfo> regionInfos = admin.getRegions(TABLE_NAME);
|
List<RegionInfo> regionInfos = ADMIN.getRegions(TABLE_NAME);
|
||||||
// Since a flush request will be sent while initializing SplitTableRegionProcedure
|
// Since a flush request will be sent while initializing SplitTableRegionProcedure
|
||||||
// Create SplitTableRegionProcedure first before put data
|
// Create SplitTableRegionProcedure first before put data
|
||||||
SplitTableRegionProcedure splitProcedure = new SplitTableRegionProcedure(
|
SplitTableRegionProcedure splitProcedure =
|
||||||
env, regionInfos.get(0), Bytes.toBytes("row5"));
|
new SplitTableRegionProcedure(env, regionInfos.get(0), Bytes.toBytes("row5"));
|
||||||
// write some rows to the table
|
// write some rows to the table
|
||||||
LOG.info("Begin to put data");
|
LOG.info("Begin to put data");
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
|
@ -101,19 +96,18 @@ public class TestSplitRegionWhileRSCrash {
|
||||||
}
|
}
|
||||||
executor.submitProcedure(splitProcedure);
|
executor.submitProcedure(splitProcedure);
|
||||||
LOG.info("SplitProcedure submitted");
|
LOG.info("SplitProcedure submitted");
|
||||||
UTIL.waitFor(30000, () -> executor.getProcedures().stream()
|
UTIL.waitFor(30000,
|
||||||
.filter(p -> p instanceof TransitRegionStateProcedure)
|
() -> executor.getProcedures().stream().filter(p -> p instanceof TransitRegionStateProcedure)
|
||||||
.map(p -> (TransitRegionStateProcedure) p)
|
.map(p -> (TransitRegionStateProcedure) p)
|
||||||
.anyMatch(p -> TABLE_NAME.equals(p.getTableName())));
|
.anyMatch(p -> TABLE_NAME.equals(p.getTableName())));
|
||||||
UTIL.getMiniHBaseCluster().killRegionServer(
|
UTIL.getMiniHBaseCluster()
|
||||||
UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName());
|
.killRegionServer(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName());
|
||||||
UTIL.getMiniHBaseCluster().startRegionServer();
|
UTIL.getMiniHBaseCluster().startRegionServer();
|
||||||
UTIL.waitUntilNoRegionsInTransition();
|
UTIL.waitUntilNoRegionsInTransition();
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
ResultScanner results = TABLE.getScanner(scan);
|
ResultScanner results = TABLE.getScanner(scan);
|
||||||
int count = 0;
|
int count = 0;
|
||||||
Result result = null;
|
while (results.next() != null) {
|
||||||
while ((result = results.next()) != null) {
|
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
Assert.assertEquals("There should be 10 rows!", 10, count);
|
Assert.assertEquals("There should be 10 rows!", 10, count);
|
||||||
|
|
|
@ -307,7 +307,7 @@ public class MasterAddressTracker extends ZKNodeTracker {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ServerName> getBackupMasters() throws InterruptedIOException {
|
public List<ServerName> getBackupMasters() {
|
||||||
return backupMasters;
|
return backupMasters;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue