HBASE-23305: Implement master based registry for client connections

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Bharath Vissapragada 2020-09-03 19:30:13 -07:00
parent d866994635
commit ebe9e68274
No known key found for this signature in database
GPG Key ID: 18AE42A0B5A93FA7
21 changed files with 1665 additions and 198 deletions

View File

@ -942,7 +942,7 @@ class ConnectionManager {
protected String clusterId = null;
void retrieveClusterId() {
void retrieveClusterId() throws IOException {
if (clusterId != null) return;
this.clusterId = this.registry.getClusterId();
if (clusterId == null) {
@ -1592,7 +1592,7 @@ class ConnectionManager {
private Object makeStubNoRetries() throws IOException, ServiceException {
ServerName sn = registry.getActiveMaster();
if (sn == null) {
String msg = "ZooKeeper available but no active master location found";
String msg = "No active master location found";
LOG.info(msg);
throw new MasterNotRunningException(msg);
}
@ -2587,6 +2587,9 @@ class ConnectionManager {
if (this.closed) {
return;
}
if (this.registry != null) {
this.registry.close();
}
closeMaster();
shutdownPools();
if (this.metrics != null) {

View File

@ -33,7 +33,7 @@ interface ConnectionRegistry {
/**
* @param connection
*/
void init(Connection connection);
void init(Connection connection) throws IOException;
/**
* @return the currently active master, null if none exists.
@ -49,11 +49,16 @@ interface ConnectionRegistry {
/**
* @return Cluster id.
*/
String getClusterId();
String getClusterId() throws IOException;
/**
* @return Count of 'running' regionservers
* @throws IOException
*/
int getCurrentNrHRS() throws IOException;
/**
* Cleanup state, if any.
*/
void close();
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
@ -26,7 +27,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
class ConnectionRegistryFactory {
static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
/**
* @return The cluster registry implementation to use.
@ -34,8 +34,8 @@ class ConnectionRegistryFactory {
*/
static ConnectionRegistry getRegistry(final Connection connection)
throws IOException {
String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY,
ZooKeeperConnectionRegistry.class.getName());
String registryClass = connection.getConfiguration().get(HConstants.REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class.getName());
ConnectionRegistry registry = null;
try {
registry = (ConnectionRegistry)Class.forName(registryClass).getDeclaredConstructor().newInstance();

View File

@ -0,0 +1,233 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.DNS.getMasterHostname;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNumLiveRSRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNumLiveRSResponse;
/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}. All the registry methods are
* blocking unlike implementations in other branches.
*/
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
private ImmutableMap<String, ClientMetaService.Interface> masterAddr2Stub;
// RPC client used to talk to the masters.
private RpcClient rpcClient;
private RpcControllerFactory rpcControllerFactory;
private int rpcTimeoutMs;
@Override
public void init(Connection connection) throws IOException {
Configuration conf = connection.getConfiguration();
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// HBASE-25051: we pass cluster id as null here since we do not have a cluster id yet, we have
// to fetch this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
populateMasterStubs(parseMasterAddrs(conf));
}
private interface Callable <T extends Message> {
T call(ClientMetaService.Interface stub, RpcController controller) throws IOException;
}
private <T extends Message> T doCall(Callable<T> callable) throws MasterRegistryFetchException {
Exception lastException = null;
Set<String> masters = masterAddr2Stub.keySet();
List<ClientMetaService.Interface> stubs = new ArrayList<>(masterAddr2Stub.values());
Collections.shuffle(stubs, ThreadLocalRandom.current());
for (ClientMetaService.Interface stub: stubs) {
HBaseRpcController controller = rpcControllerFactory.newController();
try {
T resp = callable.call(stub, controller);
if (controller.failed()) {
lastException = controller.getFailed();
continue;
}
return resp;
} catch (Exception e) {
lastException = e;
}
}
// rpcs to all masters failed.
throw new MasterRegistryFetchException(masters, lastException);
}
@Override
public ServerName getActiveMaster() throws IOException {
GetActiveMasterResponse resp = doCall(new Callable<GetActiveMasterResponse>() {
@Override
public GetActiveMasterResponse call(
ClientMetaService.Interface stub, RpcController controller) throws IOException {
BlockingRpcCallback<GetActiveMasterResponse> cb = new BlockingRpcCallback<>();
stub.getActiveMaster(controller, GetActiveMasterRequest.getDefaultInstance(), cb);
return cb.get();
}
});
if (!resp.hasServerName() || resp.getServerName() == null) {
throw new HBaseIOException("No active master found");
}
return ProtobufUtil.toServerName(resp.getServerName());
}
@Override
public RegionLocations getMetaRegionLocations() throws IOException {
GetMetaRegionLocationsResponse resp = doCall(new Callable<GetMetaRegionLocationsResponse>() {
@Override
public GetMetaRegionLocationsResponse call(
ClientMetaService.Interface stub, RpcController controller) throws IOException {
BlockingRpcCallback<GetMetaRegionLocationsResponse> cb = new BlockingRpcCallback<>();
stub.getMetaRegionLocations(controller, GetMetaRegionLocationsRequest.getDefaultInstance(),
cb);
return cb.get();
}
});
List<HRegionLocation> result = new ArrayList<>();
for (HBaseProtos.RegionLocation loc: resp.getMetaLocationsList()) {
result.add(ProtobufUtil.toRegionLocation(loc));
}
return new RegionLocations(result);
}
@Override
public String getClusterId() throws IOException {
GetClusterIdResponse resp = doCall(new Callable<GetClusterIdResponse>() {
@Override
public GetClusterIdResponse call(ClientMetaService.Interface stub, RpcController controller)
throws IOException {
BlockingRpcCallback<GetClusterIdResponse> cb = new BlockingRpcCallback<>();
stub.getClusterId(controller, GetClusterIdRequest.getDefaultInstance(), cb);
return cb.get();
}
});
return resp.getClusterId();
}
@Override
public int getCurrentNrHRS() throws IOException {
GetNumLiveRSResponse resp = doCall(new Callable<GetNumLiveRSResponse>() {
@Override
public GetNumLiveRSResponse call(ClientMetaService.Interface stub, RpcController controller)
throws IOException {
BlockingRpcCallback<GetNumLiveRSResponse> cb = new BlockingRpcCallback<>();
stub.getNumLiveRS(controller, GetNumLiveRSRequest.getDefaultInstance(), cb);
return cb.get();
}
});
return resp.getNumRegionServers();
}
@Override
public void close() {
if (rpcClient != null) {
rpcClient.close();
}
}
/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
* separated host[:port] values. If no port number if specified, default master port is assumed.
* @param conf Configuration to parse from.
*/
@InterfaceAudience.Private
public static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
Set<ServerName> masterAddrs = new HashSet<>();
String configuredMasters = getMasterAddr(conf);
for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
HostAndPort masterHostPort =
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
}
Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed");
return masterAddrs;
}
/**
* Builds the default master address end point if it is not specified in the configuration.
* <p/>
* Will be called in {@code HBaseTestingUtility}.
*/
@InterfaceAudience.Private
public static String getMasterAddr(Configuration conf) throws UnknownHostException {
String masterAddrFromConf = conf.get(HConstants.MASTER_ADDRS_KEY);
if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
return masterAddrFromConf;
}
String hostname = getMasterHostname(conf);
int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
return String.format("%s:%d", hostname, port);
}
void populateMasterStubs(Set<ServerName> masters) throws IOException {
Preconditions.checkNotNull(masters);
ImmutableMap.Builder<String, ClientMetaService.Interface> builder = ImmutableMap.builder();
User user = User.getCurrent();
for (ServerName masterAddr : masters) {
builder.put(masterAddr.toString(), ClientMetaService.newStub(
rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
masterAddr2Stub = builder.build();
}
}

View File

@ -36,8 +36,8 @@ import org.apache.zookeeper.KeeperException;
/**
* A cluster registry that stores to zookeeper.
*/
class ZooKeeperConnectionRegistry implements ConnectionRegistry {
private static final Log LOG = LogFactory.getLog(ZooKeeperConnectionRegistry.class);
class ZKConnectionRegistry implements ConnectionRegistry {
private static final Log LOG = LogFactory.getLog(ZKConnectionRegistry.class);
// Needs an instance of hci to function. Set after construct this instance.
ConnectionManager.HConnectionImplementation hci;
@ -126,4 +126,8 @@ class ZooKeeperConnectionRegistry implements ConnectionRegistry {
throw new IOException("Unexpected ZooKeeper exception", ke);
}
}
@Override
public void close() {
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
@ -42,6 +43,8 @@ public class SecurityInfo {
new SecurityInfo("hbase.regionserver.kerberos.principal", Kind.HBASE_AUTH_TOKEN));
infos.put(MasterService.getDescriptor().getName(),
new SecurityInfo("hbase.master.kerberos.principal", Kind.HBASE_AUTH_TOKEN));
infos.put(ClientMetaService.getDescriptor().getName(),
new SecurityInfo("hbase.master.kerberos.principal", Kind.HBASE_AUTH_TOKEN));
infos.put(RegionServerStatusProtos.RegionServerStatusService.getDescriptor().getName(),
new SecurityInfo("hbase.master.kerberos.principal", Kind.HBASE_AUTH_TOKEN));
}

View File

@ -483,6 +483,10 @@ public class TestAsyncProcess {
public int getCurrentNrHRS() throws IOException {
return 1;
}
@Override
public void close() {
}
}
final AtomicInteger nbThreads = new AtomicInteger(0);
@ -492,7 +496,7 @@ public class TestAsyncProcess {
}
private static Configuration setupConf(Configuration conf) {
conf.setClass(ConnectionRegistryFactory.REGISTRY_IMPL_CONF_KEY, TestConnectionRegistry.class, ConnectionRegistry.class);
conf.setClass(HConstants.REGISTRY_IMPL_CONF_KEY, TestConnectionRegistry.class, ConnectionRegistry.class);
return conf;
}

View File

@ -138,6 +138,10 @@ public class TestClientNoCluster extends Configured implements Tool {
public int getCurrentNrHRS() throws IOException {
return 1;
}
@Override
public void close() {
}
}
/**

View File

@ -79,6 +79,7 @@ public final class HConstants {
Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT;
/** Just an array of bytes of the right size. */
public static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HFILEBLOCK_HEADER_SIZE];
public static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
//End HFileBlockConstants.
@ -176,7 +177,7 @@ public final class HConstants {
public static final String MASTER_INFO_PORT = "hbase.master.info.port";
/** Configuration key for the list of master host:ports **/
public static final String MASTER_ADDRS_KEY = "hbase.master.addrs";
public static final String MASTER_ADDRS_KEY = "hbase.masters";
public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.PrettyPrinter;
/**
* Exception thrown when an master registry RPC fails in client. The exception includes the list of
* masters to which RPC was attempted and the last exception encountered. Prior exceptions are
* included in the logs.
*/
@InterfaceAudience.Private
public class MasterRegistryFetchException extends HBaseIOException {
private static final long serialVersionUID = 6992134872168185171L;
public MasterRegistryFetchException(Set<String> masters, Throwable failure) {
super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
failure);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
import java.lang.reflect.Method;
import java.net.UnknownHostException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
@ -66,4 +67,14 @@ public final class DNS {
return org.apache.hadoop.net.DNS.getDefaultHost(strInterface, nameserver);
}
}
public static String getMasterHostname(Configuration conf) throws UnknownHostException {
String hostname = conf.get("hbase.master.hostname", "");
if (hostname.isEmpty()) {
return Strings.domainNamePointerToHostName(getDefaultHost(
conf.get("hbase.master.dns.interface", "default"),
conf.get("hbase.master.dns.nameserver", "default")));
}
return hostname;
}
}

View File

@ -19,6 +19,11 @@
package org.apache.hadoop.hbase.util;
import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
@ -97,4 +102,18 @@ public class PrettyPrinter {
return sb.toString();
}
/**
* Pretty prints a collection of any type to a string. Relies on toString() implementation of the
* object type.
* @param collection collection to pretty print.
* @return Pretty printed string for the collection.
*/
public static String toString(Collection<?> collection) {
List<String> stringList = new ArrayList<>();
for (Object o: collection) {
stringList.add(Objects.toString(o));
}
return "[" + Joiner.on(',').join(stringList) + "]";
}
}

View File

@ -938,6 +938,13 @@ message GetMetaRegionLocationsResponse {
repeated RegionLocation meta_locations = 1;
}
/** Request and response to get the number of live region servers */
message GetNumLiveRSRequest {
}
message GetNumLiveRSResponse {
required int32 num_region_servers = 1;
}
/**
* Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment.
*/
@ -956,4 +963,9 @@ service ClientMetaService {
* Get current meta replicas' region locations.
*/
rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
/**
* Get number of live region servers.
*/
rpc GetNumLiveRS(GetNumLiveRSRequest) returns(GetNumLiveRSResponse);
}

View File

@ -2641,6 +2641,21 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
return masterFinishedInitializationTime;
}
/**
* @return number of live region servers tracked by this master.
* @throws KeeperException if there is an issue with zookeeper connection.
*/
public int getNumLiveRegionServers() throws KeeperException {
if (isActiveMaster()) {
return regionServerTracker.getOnlineServers().size();
}
// If the master is not active, we fall back to ZK to fetch the number of live region servers.
// This is an extra hop but that is okay since the ConnectionRegistry call that is serviced by
// this method is already deprecated and is not used in any active code paths. This method is
// here to only for the test code.
return ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
}
public int getNumWALFiles() {
return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
}

View File

@ -109,6 +109,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLoca
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNumLiveRSRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNumLiveRSResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
@ -1824,4 +1826,15 @@ public class MasterRpcServices extends RSRpcServices
return response.build();
}
@Override
public GetNumLiveRSResponse getNumLiveRS(RpcController rpcController, GetNumLiveRSRequest request)
throws ServiceException {
GetNumLiveRSResponse.Builder response = GetNumLiveRSResponse.newBuilder();
try {
response.setNumRegionServers(master.getNumLiveRegionServers());
} catch (KeeperException ke) {
throw new ServiceException(ke);
}
return response.build();
}
}

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
import com.google.common.base.Preconditions;
import java.io.InterruptedIOException;
import java.io.IOException;
import java.lang.reflect.Constructor;
@ -26,6 +27,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
@ -144,6 +146,11 @@ public class JVMClusterUtil {
} catch (Exception e) {
throw new IOException(e);
}
// Needed if a master based registry is configured for internal cluster connections. Here, we
// just add the current master host port since we do not know other master addresses up front
// in mini cluster tests.
c.set(HConstants.MASTER_ADDRS_KEY,
Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
return new JVMClusterUtil.MasterThread(server, index);
}

View File

@ -1110,6 +1110,9 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
Configuration c = new Configuration(this.conf);
this.hbaseCluster =
new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
// Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY,
c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
// Don't leave here till we've done a successful scan of the hbase:meta
Table t = new HTable(c, TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan());

View File

@ -30,6 +30,7 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -117,40 +118,88 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Run tests that use the HBase clients; {@link HTable}.
* Sets up the HBase mini cluster once at start and runs through all client tests.
* Each creates a table named for the method and does its stuff against that.
*
* Parameterized to run with different registry implementations.
*/
@Category(LargeTests.class)
@SuppressWarnings ("deprecation")
@RunWith(Parameterized.class)
public class TestFromClientSide {
private static final Log LOG = LogFactory.getLog(TestFromClientSide.class);
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static HBaseTestingUtility TEST_UTIL;
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
protected static int SLAVES = 3;
@Parameterized.Parameters
public static Collection parameters() {
return Arrays.asList(new Object[][] {
{ MasterRegistry.class },
{ ZKConnectionRegistry.class }
});
}
// To keep the child classes happy.
TestFromClientSide() {}
public TestFromClientSide(Class<? extends ConnectionRegistry> registry) throws Exception {
initialize(registry);
}
/**
* @throws java.lang.Exception
* JUnit does not provide an easy way to run a hook after each parameterized run. Without that
* there is no easy way to restart the test cluster after each parameterized run. Annotation
* BeforeParam does not work either because it runs before parameterization and hence does not
* have access to the test parameters (which is weird).
*
* This *hack* checks if the current instance of test cluster configuration has the passed
* parameterized configs. In such a case, we can just reuse the cluster for test and do not need
* to initialize from scratch. While this is a hack, it saves a ton of time for the full
* test and de-flakes it.
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
protected static boolean isSameParameterizedCluster(
Class<? extends ConnectionRegistry> registryImpl) {
if (TEST_UTIL == null) {
return false;
}
Configuration conf = TEST_UTIL.getConfiguration();
Class<? extends ConnectionRegistry> confClass = conf.getClass(HConstants.REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class, ConnectionRegistry.class);
return confClass.getName().equals(registryImpl.getName());
}
public static void initialize(Class<? extends ConnectionRegistry> registry) throws Exception {
// initialize() is called for every unit test, however we only want to reset the cluster state
// at the end of every parameterized run.
if (isSameParameterizedCluster(registry)) {
return;
}
if (TEST_UTIL != null) {
// We reached end of a parameterized run, clean up.
TEST_UTIL.shutdownMiniCluster();
}
TEST_UTIL = new HBaseTestingUtility();
// Uncomment the following lines if more verbosity is needed for
// debugging (see HBASE-12285 for details).
//((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setClass(HConstants.REGISTRY_IMPL_CONF_KEY, registry, ConnectionRegistry.class);
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MultiRowMutationEndpoint.class.getName());
conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests
@ -167,22 +216,6 @@ public class TestFromClientSide {
TEST_UTIL.shutdownMiniCluster();
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
// Nothing to do.
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
// Nothing to do.
}
/**
* Test append result when there are duplicate rpc request.
*/
@ -4461,6 +4494,12 @@ public class TestFromClientSide {
*/
@Test
public void testUnmanagedHConnectionReconnect() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
Class registryImpl = conf.getClass(
HConstants.REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
// This test does not make sense for MasterRegistry since it stops the only master in the
// cluster and starts a new master without populating the underlying config for the connection.
Assume.assumeFalse(registryImpl.equals(MasterRegistry.class));
final TableName tableName = TableName.valueOf("testUnmanagedHConnectionReconnect");
HTable t = createUnmangedHConnectionHTable(tableName);
Connection conn = t.getConnection();

View File

@ -17,13 +17,18 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized;
/**
* Test all client operations with a coprocessor that
@ -31,12 +36,32 @@ import org.junit.experimental.categories.Category;
*/
@Category(LargeTests.class)
public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@Parameterized.Parameters
public static Collection parameters() {
return Arrays.asList(new Object[][] {
{ ZKConnectionRegistry.class }
});
}
public TestFromClientSideWithCoprocessor(Class registry) throws Exception {
initialize(registry);
}
public static void initialize(Class<? extends ConnectionRegistry> registry) throws Exception {
if (isSameParameterizedCluster(registry)) {
return;
}
if (TEST_UTIL != null) {
// We reached end of a parameterized run, clean up.
TEST_UTIL.shutdownMiniCluster();
}
TEST_UTIL = new HBaseTestingUtility();
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName());
conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests
conf.setClass(HConstants.REGISTRY_IMPL_CONF_KEY, registry, ConnectionRegistry.class);
// We need more than one region server in this test
TEST_UTIL.startMiniCluster(SLAVES);
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.junit.Assert.assertEquals;
import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestMasterRegistry {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final int META_REPLICA_COUNT = 3;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, META_REPLICA_COUNT);
TEST_UTIL.startMiniCluster(3, 3);
}
@AfterClass
public static void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Generates a string of dummy master addresses in host:port format. Every other hostname won't
* have a port number.
*/
private static String generateDummyMastersList(int size) {
List<String> masters = new ArrayList<>();
for (int i = 0; i < size; i++) {
masters.add(" localhost" + (i % 2 == 0 ? ":" + (1000 + i) : ""));
}
return Joiner.on(",").join(masters);
}
/**
* Makes sure the master registry parses the master end points in the configuration correctly.
*/
@Test
public void testMasterAddressParsing() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
int numMasters = 10;
conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
List<ServerName> parsedMasters = new ArrayList<>(MasterRegistry.parseMasterAddrs(conf));
// Half of them would be without a port, duplicates are removed.
assertEquals(numMasters / 2 + 1, parsedMasters.size());
// Sort in the increasing order of port numbers.
Collections.sort(parsedMasters, new Comparator<ServerName>() {
@Override
public int compare(ServerName sn1, ServerName sn2) {
return sn1.getPort() - sn2.getPort();
}
});
for (int i = 0; i < parsedMasters.size(); i++) {
ServerName sn = parsedMasters.get(i);
assertEquals("localhost", sn.getHostname());
if (i == parsedMasters.size() - 1) {
// Last entry should be the one with default port.
assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort());
} else {
assertEquals(1000 + (2 * i), sn.getPort());
}
}
}
@Test
public void testRegistryRPCs() throws Exception {
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
final MasterRegistry registry = new MasterRegistry();
try {
registry.init(TEST_UTIL.getConnection());
// Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
// because not all replicas had made it up before test started.
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return registry.getMetaRegionLocations().size() == META_REPLICA_COUNT;
}
});
assertEquals(registry.getClusterId(), activeMaster.getClusterId());
assertEquals(registry.getActiveMaster(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
Arrays.asList(registry.getMetaRegionLocations().getRegionLocations());
List<HRegionLocation> actualMetaLocations =
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations();
Collections.sort(metaLocations);
Collections.sort(actualMetaLocations);
assertEquals(actualMetaLocations, metaLocations);
int numRs = registry.getCurrentNrHRS();
assertEquals(TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(), numRs);
} finally {
registry.close();
}
}
}