HBASE-23304: RPCs needed for client meta information lookup (#904)
* HBASE-23304: RPCs needed for client meta information lookup This patch implements the RPCs needed for the meta information lookup during connection init. New tests added to cover the RPC code paths. HBASE-23305 builds on this to implement the client side logic. Fixed a bunch of checkstyle nits around the places the patch touches. Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
8571d389cf
commit
4f8fbba0c0
|
@ -376,7 +376,9 @@ public final class ProtobufUtil {
|
|||
* @see #toServerName(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName)
|
||||
*/
|
||||
public static HBaseProtos.ServerName toServerName(final ServerName serverName) {
|
||||
if (serverName == null) return null;
|
||||
if (serverName == null) {
|
||||
return null;
|
||||
}
|
||||
HBaseProtos.ServerName.Builder builder =
|
||||
HBaseProtos.ServerName.newBuilder();
|
||||
builder.setHostName(serverName.getHostname());
|
||||
|
|
|
@ -1200,3 +1200,47 @@ service HbckService {
|
|||
rpc FixMeta(FixMetaRequest)
|
||||
returns(FixMetaResponse);
|
||||
}
|
||||
|
||||
/** Request and response to get the clusterID for this cluster */
|
||||
message GetClusterIdRequest {
|
||||
}
|
||||
message GetClusterIdResponse {
|
||||
/** Not set if cluster ID could not be determined. */
|
||||
optional string cluster_id = 1;
|
||||
}
|
||||
|
||||
/** Request and response to get the currently active master name for this cluster */
|
||||
message GetActiveMasterRequest {
|
||||
}
|
||||
message GetActiveMasterResponse {
|
||||
/** Not set if an active master could not be determined. */
|
||||
optional ServerName server_name = 1;
|
||||
}
|
||||
|
||||
/** Request and response to get the current list of meta region locations */
|
||||
message GetMetaRegionLocationsRequest {
|
||||
}
|
||||
message GetMetaRegionLocationsResponse {
|
||||
/** Not set if meta region locations could not be determined. */
|
||||
repeated RegionLocation meta_locations = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment.
|
||||
*/
|
||||
service ClientMetaService {
|
||||
/**
|
||||
* Get Cluster ID for this cluster.
|
||||
*/
|
||||
rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
|
||||
|
||||
/**
|
||||
* Get active master server name for this cluster.
|
||||
*/
|
||||
rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
|
||||
|
||||
/**
|
||||
* Get current meta replicas' region locations.
|
||||
*/
|
||||
rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.BindException;
|
||||
|
@ -30,6 +29,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
|
@ -116,11 +117,9 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
|
||||
|
@ -161,6 +160,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceReq
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
|
||||
|
@ -185,12 +185,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
|
@ -351,9 +357,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.Snapshot
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("deprecation")
|
||||
public class MasterRpcServices extends RSRpcServices
|
||||
implements MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
|
||||
LockService.BlockingInterface, HbckService.BlockingInterface {
|
||||
public class MasterRpcServices extends RSRpcServices implements
|
||||
MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
|
||||
LockService.BlockingInterface, HbckService.BlockingInterface,
|
||||
ClientMetaService.BlockingInterface {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
|
||||
private static final Logger AUDITLOG =
|
||||
LoggerFactory.getLogger("SecurityLogger."+MasterRpcServices.class.getName());
|
||||
|
@ -362,7 +369,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
/**
|
||||
* @return Subset of configuration to pass initializing regionservers: e.g.
|
||||
* the filesystem to use and root directory to use.
|
||||
* the filesystem to use and root directory to use.
|
||||
*/
|
||||
private RegionServerStartupResponse.Builder createConfigurationSubset() {
|
||||
RegionServerStartupResponse.Builder resp = addConfig(
|
||||
|
@ -488,15 +495,17 @@ public class MasterRpcServices extends RSRpcServices
|
|||
protected List<BlockingServiceAndInterface> getServices() {
|
||||
List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
|
||||
bssi.add(new BlockingServiceAndInterface(
|
||||
MasterService.newReflectiveBlockingService(this),
|
||||
MasterService.BlockingInterface.class));
|
||||
MasterService.newReflectiveBlockingService(this),
|
||||
MasterService.BlockingInterface.class));
|
||||
bssi.add(new BlockingServiceAndInterface(
|
||||
RegionServerStatusService.newReflectiveBlockingService(this),
|
||||
RegionServerStatusService.BlockingInterface.class));
|
||||
RegionServerStatusService.newReflectiveBlockingService(this),
|
||||
RegionServerStatusService.BlockingInterface.class));
|
||||
bssi.add(new BlockingServiceAndInterface(LockService.newReflectiveBlockingService(this),
|
||||
LockService.BlockingInterface.class));
|
||||
bssi.add(new BlockingServiceAndInterface(HbckService.newReflectiveBlockingService(this),
|
||||
HbckService.BlockingInterface.class));
|
||||
bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
|
||||
ClientMetaService.BlockingInterface.class));
|
||||
bssi.addAll(super.getServices());
|
||||
return bssi;
|
||||
}
|
||||
|
@ -623,7 +632,9 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
final byte[] regionName = req.getRegion().getValue().toByteArray();
|
||||
final RegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName);
|
||||
if (regionInfo == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
|
||||
if (regionInfo == null) {
|
||||
throw new UnknownRegionException(Bytes.toStringBinary(regionName));
|
||||
}
|
||||
|
||||
final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
|
||||
if (master.cpHost != null) {
|
||||
|
@ -668,7 +679,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
@Override
|
||||
public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
|
||||
throws ServiceException {
|
||||
throws ServiceException {
|
||||
TableDescriptor tableDescriptor = ProtobufUtil.toTableDescriptor(req.getTableSchema());
|
||||
byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
|
||||
try {
|
||||
|
@ -1062,7 +1073,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
* Get list of TableDescriptors for requested tables.
|
||||
* @param c Unused (set to null).
|
||||
* @param req GetTableDescriptorsRequest that contains:
|
||||
* - tableNames: requested tables, or if empty, all are requested
|
||||
* - tableNames: requested tables, or if empty, all are requested.
|
||||
* @return GetTableDescriptorsResponse
|
||||
* @throws ServiceException
|
||||
*/
|
||||
|
@ -1206,9 +1217,9 @@ public class MasterRpcServices extends RSRpcServices
|
|||
/**
|
||||
* Checks if the specified snapshot is done.
|
||||
* @return true if the snapshot is in file system ready to use,
|
||||
* false if the snapshot is in the process of completing
|
||||
* false if the snapshot is in the process of completing
|
||||
* @throws ServiceException wrapping UnknownSnapshotException if invalid snapshot, or
|
||||
* a wrapped HBaseSnapshotException with progress failure reason.
|
||||
* a wrapped HBaseSnapshotException with progress failure reason.
|
||||
*/
|
||||
@Override
|
||||
public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
|
||||
|
@ -1450,7 +1461,9 @@ public class MasterRpcServices extends RSRpcServices
|
|||
|
||||
final byte[] regionName = request.getRegion().getValue().toByteArray();
|
||||
final RegionInfo hri = master.getAssignmentManager().getRegionInfo(regionName);
|
||||
if (hri == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
|
||||
if (hri == null) {
|
||||
throw new UnknownRegionException(Bytes.toStringBinary(regionName));
|
||||
}
|
||||
|
||||
if (master.cpHost != null) {
|
||||
master.cpHost.preRegionOffline(hri);
|
||||
|
@ -2311,8 +2324,8 @@ public class MasterRpcServices extends RSRpcServices
|
|||
report.getRegionSize(), now);
|
||||
}
|
||||
} else {
|
||||
LOG.debug(
|
||||
"Received region space usage report but HMaster is not ready to process it, skipping");
|
||||
LOG.debug("Received region space usage report but HMaster is not ready to process it, "
|
||||
+ "skipping");
|
||||
}
|
||||
return RegionSpaceUseReportResponse.newBuilder().build();
|
||||
} catch (Exception e) {
|
||||
|
@ -2348,8 +2361,8 @@ public class MasterRpcServices extends RSRpcServices
|
|||
}
|
||||
return builder.build();
|
||||
} else {
|
||||
LOG.debug(
|
||||
"Received space quota region size report but HMaster is not ready to process it, skipping");
|
||||
LOG.debug("Received space quota region size report but HMaster is not ready to process it,"
|
||||
+ "skipping");
|
||||
}
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
|
@ -2893,4 +2906,34 @@ public class MasterRpcServices extends RSRpcServices
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request)
|
||||
throws ServiceException {
|
||||
GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder();
|
||||
String clusterId = master.getClusterId();
|
||||
if (clusterId != null) {
|
||||
resp.setClusterId(clusterId);
|
||||
}
|
||||
return resp.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
|
||||
GetActiveMasterRequest request) throws ServiceException {
|
||||
GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
|
||||
Optional<ServerName> serverName = master.getActiveMaster();
|
||||
serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name)));
|
||||
return resp.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
|
||||
GetMetaRegionLocationsRequest request) throws ServiceException {
|
||||
GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder();
|
||||
Optional<List<HRegionLocation>> metaLocations =
|
||||
master.getMetaRegionLocationCache().getMetaRegionLocations();
|
||||
metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach(
|
||||
location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
|
||||
return response.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.StartMiniClusterOption;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
|
||||
|
||||
@Category({MediumTests.class, MasterTests.class})
|
||||
public class TestClientMetaServiceRPCs {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestClientMetaServiceRPCs.class);
|
||||
|
||||
// Total number of masters (active + stand by) for the purpose of this test.
|
||||
private static final int MASTER_COUNT = 3;
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static Configuration conf;
|
||||
private static int rpcTimeout;
|
||||
private static RpcClient rpcClient;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
// Start the mini cluster with stand-by masters.
|
||||
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
|
||||
builder.numMasters(MASTER_COUNT).numRegionServers(3);
|
||||
TEST_UTIL.startMiniCluster(builder.build());
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
|
||||
rpcClient = RpcClientFactory.createClient(conf,
|
||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
if (rpcClient != null) {
|
||||
rpcClient.close();
|
||||
}
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private static ClientMetaService.BlockingInterface getMasterStub(ServerName server)
|
||||
throws IOException {
|
||||
return ClientMetaService.newBlockingStub(
|
||||
rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeout));
|
||||
}
|
||||
|
||||
private static HBaseRpcController getRpcController() {
|
||||
return RpcControllerFactory.instantiate(conf).newController();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the cluster ID from all running masters.
|
||||
*/
|
||||
@Test public void TestClusterID() throws Exception {
|
||||
HBaseRpcController rpcController = getRpcController();
|
||||
String clusterID = TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId();
|
||||
int rpcCount = 0;
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
ClientMetaService.BlockingInterface stub =
|
||||
getMasterStub(masterThread.getMaster().getServerName());
|
||||
GetClusterIdResponse resp =
|
||||
stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance());
|
||||
assertEquals(clusterID, resp.getClusterId());
|
||||
rpcCount++;
|
||||
}
|
||||
assertEquals(MASTER_COUNT, rpcCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the active master ServerName as seen by all masters.
|
||||
*/
|
||||
@Test public void TestActiveMaster() throws Exception {
|
||||
HBaseRpcController rpcController = getRpcController();
|
||||
ServerName activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName();
|
||||
int rpcCount = 0;
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
ClientMetaService.BlockingInterface stub =
|
||||
getMasterStub(masterThread.getMaster().getServerName());
|
||||
GetActiveMasterResponse resp =
|
||||
stub.getActiveMaster(rpcController, GetActiveMasterRequest.getDefaultInstance());
|
||||
assertEquals(activeMaster, ProtobufUtil.toServerName(resp.getServerName()));
|
||||
rpcCount++;
|
||||
}
|
||||
assertEquals(MASTER_COUNT, rpcCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the meta region locations RPC returns consistent results across all masters.
|
||||
*/
|
||||
@Test public void TestMetaLocations() throws Exception {
|
||||
HBaseRpcController rpcController = getRpcController();
|
||||
List<HRegionLocation> metaLocations = TEST_UTIL.getMiniHBaseCluster().getMaster()
|
||||
.getMetaRegionLocationCache().getMetaRegionLocations().get();
|
||||
Collections.sort(metaLocations);
|
||||
int rpcCount = 0;
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
ClientMetaService.BlockingInterface stub =
|
||||
getMasterStub(masterThread.getMaster().getServerName());
|
||||
GetMetaRegionLocationsResponse resp = stub.getMetaRegionLocations(
|
||||
rpcController, GetMetaRegionLocationsRequest.getDefaultInstance());
|
||||
List<HRegionLocation> result = new ArrayList<>();
|
||||
resp.getMetaLocationsList().forEach(
|
||||
location -> result.add(ProtobufUtil.toRegionLocation(location)));
|
||||
Collections.sort(result);
|
||||
assertEquals(metaLocations, result);
|
||||
rpcCount++;
|
||||
}
|
||||
assertEquals(MASTER_COUNT, rpcCount);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue