HBASE-23647: Make MasterRegistry the default impl. (#1039)

Signed-off-by: Stack <stack@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
(cherry picked from commit 229b8aaaf3)
This commit is contained in:
Bharath Vissapragada 2020-01-27 12:36:09 -08:00
parent be30d43a6c
commit 69e3e0e2ef
54 changed files with 559 additions and 175 deletions

View File

@ -36,7 +36,7 @@ final class ConnectionRegistryFactory {
*/ */
static ConnectionRegistry getRegistry(Configuration conf) { static ConnectionRegistry getRegistry(Configuration conf) {
Class<? extends ConnectionRegistry> clazz = conf.getClass( Class<? extends ConnectionRegistry> clazz = conf.getClass(
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class, CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class,
ConnectionRegistry.class); ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf); return ReflectionUtils.newInstance(clazz, conf);
} }

View File

@ -90,6 +90,7 @@ public class MasterRegistry implements ConnectionRegistry {
} else { } else {
finalConf = conf; finalConf = conf;
} }
finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT));
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
masterServers = new HashSet<>(); masterServers = new HashSet<>();
@ -146,12 +147,13 @@ public class MasterRegistry implements ConnectionRegistry {
if (rpcResult == null) { if (rpcResult == null) {
future.completeExceptionally( future.completeExceptionally(
new MasterRegistryFetchException(masterServers, hrc.getFailed())); new MasterRegistryFetchException(masterServers, hrc.getFailed()));
return;
} }
if (!isValidResp.test(rpcResult)) { if (!isValidResp.test(rpcResult)) {
// Rpc returned ok, but result was malformed. // Rpc returned ok, but result was malformed.
future.completeExceptionally(new IOException( future.completeExceptionally(new IOException(
String.format("Invalid result for request %s. Will be retried", debug))); String.format("Invalid result for request %s. Will be retried", debug)));
return;
} }
future.complete(transformResult.apply(rpcResult)); future.complete(transformResult.apply(rpcResult));
}; };

View File

@ -580,7 +580,6 @@ public final class ReplicationPeerConfigUtil {
compound.addStringMap(peerConfig.getConfiguration()); compound.addStringMap(peerConfig.getConfiguration());
return compound; return compound;
} }
return otherConf; return otherConf;
} }
} }

View File

@ -49,6 +49,8 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN)); new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.HbckService.getDescriptor().getName(), infos.put(MasterProtos.HbckService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN)); new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE // NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
// new Service will not be found when all is Kerberized!!!! // new Service will not be found when all is Kerberized!!!!
} }

View File

@ -294,14 +294,21 @@ public class HBaseConfiguration extends Configuration {
* used to communicate with distant clusters * used to communicate with distant clusters
* @param conf configuration object to configure * @param conf configuration object to configure
* @param key string that contains the 3 required configuratins * @param key string that contains the 3 required configuratins
* @throws IOException
*/ */
private static void applyClusterKeyToConf(Configuration conf, String key) private static void applyClusterKeyToConf(Configuration conf, String key)
throws IOException{ throws IOException {
ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key); ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString()); conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort()); conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent()); conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
// Without the right registry, the above configs are useless. Also, we don't use setClass()
// here because the ConnectionRegistry* classes are not resolvable from this module.
// This will be broken if ZkConnectionRegistry class gets renamed or moved. Is there a better
// way?
LOG.info("Overriding client registry implementation to {}",
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
} }
/** /**

View File

@ -188,6 +188,10 @@ public final class HConstants {
public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT; public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;
/** Full class name of the Zookeeper based connection registry implementation */
public static final String ZK_CONNECTION_REGISTRY_CLASS =
"org.apache.hadoop.hbase.client.ZKConnectionRegistry";
/** Configuration to enable hedged reads on master registry **/ /** Configuration to enable hedged reads on master registry **/
public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY = public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
"hbase.client.master_registry.enable_hedged_reads"; "hbase.client.master_registry.enable_hedged_reads";

View File

@ -114,7 +114,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
throws Exception { throws Exception {
LOG.debug("mimicSyncUpAfterBulkLoad"); LOG.debug("mimicSyncUpAfterBulkLoad");
UTIL2.shutdownMiniHBaseCluster(); shutDownTargetHBaseCluster();
loadAndReplicateHFiles(false, randomHFileRangeListIterator); loadAndReplicateHFiles(false, randomHFileRangeListIterator);
@ -126,8 +126,8 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406, assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
rowCount_ht2Source); rowCount_ht2Source);
UTIL1.shutdownMiniHBaseCluster(); shutDownSourceHBaseCluster();
UTIL2.restartHBaseCluster(1); restartTargetHBaseCluster(1);
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
@ -148,7 +148,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
if (i == NB_RETRIES - 1) { if (i == NB_RETRIES - 1) {
if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) { if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
// syncUP still failed. Let's look at the source in case anything wrong there // syncUP still failed. Let's look at the source in case anything wrong there
UTIL1.restartHBaseCluster(1); restartSourceHBaseCluster(1);
rowCount_ht1Source = UTIL1.countRows(ht1Source); rowCount_ht1Source = UTIL1.countRows(ht1Source);
LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = UTIL1.countRows(ht2Source); rowCount_ht2Source = UTIL1.countRows(ht2Source);

View File

@ -91,7 +91,7 @@ public class LocalHBaseCluster {
*/ */
public LocalHBaseCluster(final Configuration conf, final int noRegionServers) public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
throws IOException { throws IOException {
this(conf, 1, noRegionServers, getMasterImplementation(conf), this(conf, 1, 0, noRegionServers, getMasterImplementation(conf),
getRegionServerImplementation(conf)); getRegionServerImplementation(conf));
} }
@ -106,7 +106,7 @@ public class LocalHBaseCluster {
public LocalHBaseCluster(final Configuration conf, final int noMasters, public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noRegionServers) final int noRegionServers)
throws IOException { throws IOException {
this(conf, noMasters, noRegionServers, getMasterImplementation(conf), this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf),
getRegionServerImplementation(conf)); getRegionServerImplementation(conf));
} }
@ -122,6 +122,12 @@ public class LocalHBaseCluster {
HMaster.class); HMaster.class);
} }
public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers,
final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass) throws IOException {
this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass);
}
/** /**
* Constructor. * Constructor.
* @param conf Configuration to use. Post construction has the master's * @param conf Configuration to use. Post construction has the master's
@ -134,9 +140,9 @@ public class LocalHBaseCluster {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public LocalHBaseCluster(final Configuration conf, final int noMasters, public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noRegionServers, final Class<? extends HMaster> masterClass, final int noAlwaysStandByMasters, final int noRegionServers,
final Class<? extends HRegionServer> regionServerClass) final Class<? extends HMaster> masterClass,
throws IOException { final Class<? extends HRegionServer> regionServerClass) throws IOException {
this.conf = conf; this.conf = conf;
// When active, if a port selection is default then we switch to random // When active, if a port selection is default then we switch to random
@ -170,24 +176,22 @@ public class LocalHBaseCluster {
this.masterClass = (Class<? extends HMaster>) this.masterClass = (Class<? extends HMaster>)
conf.getClass(HConstants.MASTER_IMPL, masterClass); conf.getClass(HConstants.MASTER_IMPL, masterClass);
// Start the HMasters. // Start the HMasters.
for (int i = 0; i < noMasters; i++) { int i;
for (i = 0; i < noMasters; i++) {
addMaster(new Configuration(conf), i); addMaster(new Configuration(conf), i);
} }
for (int j = 0; j < noAlwaysStandByMasters; j++) {
// Populate the master address host ports in the config. This is needed if a master based Configuration c = new Configuration(conf);
// registry is configured for client metadata services (HBASE-18095) c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster");
List<String> masterHostPorts = new ArrayList<>(); addMaster(c, i + j);
getMasters().forEach(masterThread -> }
masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString()));
conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
// Start the HRegionServers. // Start the HRegionServers.
this.regionServerClass = this.regionServerClass =
(Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL, (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
regionServerClass); regionServerClass);
for (int i = 0; i < noRegionServers; i++) { for (int j = 0; j < noRegionServers; j++) {
addRegionServer(new Configuration(conf), i); addRegionServer(new Configuration(conf), j);
} }
} }
@ -233,8 +237,13 @@ public class LocalHBaseCluster {
// its Connection instance rather than share (see HBASE_INSTANCES down in // its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager. // the guts of ConnectionManager.
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
(Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index); (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
this.masterThreads.add(mt); this.masterThreads.add(mt);
// Refresh the master address config.
List<String> masterHostPorts = new ArrayList<>();
getMasters().forEach(masterThread ->
masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString()));
conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
return mt; return mt;
} }

View File

@ -56,15 +56,15 @@ public class ActiveMasterManager extends ZKListener {
final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false); final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
final AtomicBoolean clusterShutDown = new AtomicBoolean(false); final AtomicBoolean clusterShutDown = new AtomicBoolean(false);
// This server's information. // This server's information. Package-private for child implementations.
private final ServerName sn; int infoPort;
private int infoPort; final ServerName sn;
private final Server master; final Server master;
// Active master's server name. Invalidated anytime active master changes (based on ZK // Active master's server name. Invalidated anytime active master changes (based on ZK
// notifications) and lazily fetched on-demand. // notifications) and lazily fetched on-demand.
// ServerName is immutable, so we don't need heavy synchronization around it. // ServerName is immutable, so we don't need heavy synchronization around it.
private volatile ServerName activeMasterServerName; volatile ServerName activeMasterServerName;
/** /**
* @param watcher ZK watcher * @param watcher ZK watcher

View File

@ -568,7 +568,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// Some unit tests don't need a cluster, so no zookeeper at all // Some unit tests don't need a cluster, so no zookeeper at all
if (!conf.getBoolean("hbase.testing.nocluster", false)) { if (!conf.getBoolean("hbase.testing.nocluster", false)) {
this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper); this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
} else { } else {
this.metaRegionLocationCache = null; this.metaRegionLocationCache = null;
this.activeMasterManager = null; this.activeMasterManager = null;
@ -582,6 +582,15 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
} }
/**
* Protected to have custom implementations in tests override the default ActiveMaster
* implementation.
*/
protected ActiveMasterManager createActiveMasterManager(
ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
return new ActiveMasterManager(zk, sn, server);
}
@Override @Override
protected String getUseThisHostnameInstead(Configuration conf) { protected String getUseThisHostnameInstead(Configuration conf) {
return conf.get(MASTER_HOSTNAME_KEY); return conf.get(MASTER_HOSTNAME_KEY);

View File

@ -799,15 +799,17 @@ public class HRegionServer extends HasThread implements
return true; return true;
} }
/**
* Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to
* the local server; i.e. a short-circuit Connection. Safe to use going to local or remote
* server. Create this instance in a method can be intercepted and mocked in tests.
* @throws IOException
*/
@VisibleForTesting
protected ClusterConnection createClusterConnection() throws IOException { protected ClusterConnection createClusterConnection() throws IOException {
Configuration conf = this.conf; Configuration conf = this.conf;
// We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
// - Decouples RS and master life cycles. RegionServers can continue be up independent of
// masters' availability.
// - Configuration management for region servers (cluster internal) is much simpler when adding
// new masters or removing existing masters, since only clients' config needs to be updated.
// - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
// other internal connections too.
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
// Use server ZK cluster for server-issued connections, so we clone // Use server ZK cluster for server-issued connections, so we clone
// the conf and unset the client ZK related properties // the conf and unset the client ZK related properties

View File

@ -40,6 +40,8 @@ public class HBasePolicyProvider extends PolicyProvider {
new Service("security.client.protocol.acl", AdminService.BlockingInterface.class), new Service("security.client.protocol.acl", AdminService.BlockingInterface.class),
new Service("security.client.protocol.acl", new Service("security.client.protocol.acl",
MasterProtos.HbckService.BlockingInterface.class), MasterProtos.HbckService.BlockingInterface.class),
new Service("security.client.protocol.acl",
MasterProtos.ClientMetaService.BlockingInterface.class),
new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class), new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class),
new Service("security.masterregion.protocol.acl", new Service("security.masterregion.protocol.acl",
RegionServerStatusService.BlockingInterface.class) RegionServerStatusService.BlockingInterface.class)

View File

@ -153,6 +153,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.impl.Log4jLoggerAdapter; import org.slf4j.impl.Log4jLoggerAdapter;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/** /**
@ -1142,8 +1143,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
Configuration c = new Configuration(this.conf); Configuration c = new Configuration(this.conf);
TraceUtil.initTracer(c); TraceUtil.initTracer(c);
this.hbaseCluster = this.hbaseCluster =
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(), new MiniHBaseCluster(c, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass()); option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Populate the master address configuration from mini cluster configuration. // Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY, conf.set(HConstants.MASTER_ADDRS_KEY,
c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT)); c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
@ -1259,6 +1261,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
StartMiniClusterOption option = StartMiniClusterOption option =
StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
restartHBaseCluster(option); restartHBaseCluster(option);
invalidateConnection();
} }
public void restartHBaseCluster(StartMiniClusterOption option) public void restartHBaseCluster(StartMiniClusterOption option)
@ -1272,8 +1275,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
this.connection = null; this.connection = null;
} }
this.hbaseCluster = this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(), new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass()); option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta // Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf); Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME); Table t = conn.getTable(TableName.META_TABLE_NAME);
@ -3096,9 +3100,34 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
return hbaseCluster; return hbaseCluster;
} }
public void closeConnection() throws IOException {
Closeables.close(hbaseAdmin, true);
Closeables.close(connection, true);
this.hbaseAdmin = null;
this.connection = null;
}
/**
* Resets the connections so that the next time getConnection() is called, a new connection is
* created. This is needed in cases where the entire cluster / all the masters are shutdown and
* the connection is not valid anymore.
* TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
* written, not all start() stop() calls go through this class. Most tests directly operate on
* the underlying mini/local hbase cluster. That makes it difficult for this wrapper class to
* maintain the connection state automatically. Cleaning this is a much bigger refactor.
*/
public void invalidateConnection() throws IOException {
closeConnection();
// Update the master addresses if they changed.
final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
masterConfigBefore, masterConfAfter);
conf.set(HConstants.MASTER_ADDRS_KEY,
getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
}
/** /**
* Get a Connection to the cluster.
* Not thread-safe (This class needs a lot of work to make it thread-safe).
* @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
* @throws IOException * @throws IOException
*/ */

View File

@ -65,6 +65,13 @@ public class MiniClusterRule extends ExternalResource {
this.miniClusterOptions = miniClusterOptions; this.miniClusterOptions = miniClusterOptions;
} }
/**
* @return the underlying instance of {@link HBaseTestingUtility}
*/
public HBaseTestingUtility getTestingUtility() {
return testingUtility;
}
/** /**
* Create a {@link AsyncConnection} to the managed {@link MiniHBaseCluster}. It's up to the caller * Create a {@link AsyncConnection} to the managed {@link MiniHBaseCluster}. It's up to the caller
* to {@link AsyncConnection#close() close()} the connection when finished. * to {@link AsyncConnection#close() close()} the connection when finished.

View File

@ -92,7 +92,7 @@ public class MiniHBaseCluster extends HBaseCluster {
Class<? extends HMaster> masterClass, Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException { throws IOException, InterruptedException {
this(conf, numMasters, numRegionServers, null, masterClass, regionserverClass); this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
} }
/** /**
@ -103,9 +103,8 @@ public class MiniHBaseCluster extends HBaseCluster {
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
List<Integer> rsPorts, int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException { throws IOException, InterruptedException {
super(conf); super(conf);
@ -113,8 +112,9 @@ public class MiniHBaseCluster extends HBaseCluster {
// Hadoop 2 // Hadoop 2
CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
init(numMasters, numRegionServers, rsPorts, masterClass, regionserverClass); init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
this.initialClusterStatus = getClusterStatus(); regionserverClass);
this.initialClusterStatus = getClusterMetrics();
} }
public Configuration getConfiguration() { public Configuration getConfiguration() {
@ -229,8 +229,8 @@ public class MiniHBaseCluster extends HBaseCluster {
} }
} }
private void init(final int nMasterNodes, final int nRegionNodes, List<Integer> rsPorts, private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
Class<? extends HMaster> masterClass, final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException { throws IOException, InterruptedException {
try { try {
@ -242,7 +242,7 @@ public class MiniHBaseCluster extends HBaseCluster {
} }
// start up a LocalHBaseCluster // start up a LocalHBaseCluster
hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0,
masterClass, regionserverClass); masterClass, regionserverClass);
// manually add the regionservers as other users // manually add the regionservers as other users
@ -557,6 +557,8 @@ public class MiniHBaseCluster extends HBaseCluster {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new IOException("Interrupted adding master to cluster", ie); throw new IOException("Interrupted adding master to cluster", ie);
} }
conf.set(HConstants.MASTER_ADDRS_KEY,
hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY));
return t; return t;
} }

View File

@ -46,6 +46,14 @@ public final class StartMiniClusterOption {
* can find the active/primary master with {@link MiniHBaseCluster#getMaster()}. * can find the active/primary master with {@link MiniHBaseCluster#getMaster()}.
*/ */
private final int numMasters; private final int numMasters;
/**
* Number of masters that always remain standby. These set of masters never transition to active
* even if an active master does not exist. These are needed for testing scenarios where there are
* no active masters in the cluster but the cluster connection (backed by master registry) should
* still work.
*/
private final int numAlwaysStandByMasters;
/** /**
* The class to use as HMaster, or null for default. * The class to use as HMaster, or null for default.
*/ */
@ -99,11 +107,12 @@ public final class StartMiniClusterOption {
/** /**
* Private constructor. Use {@link Builder#build()}. * Private constructor. Use {@link Builder#build()}.
*/ */
private StartMiniClusterOption(int numMasters, Class<? extends HMaster> masterClass, private StartMiniClusterOption(int numMasters, int numAlwaysStandByMasters,
int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, int numRegionServers, List<Integer> rsPorts,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, int numDataNodes, Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> rsClass, int numDataNodes,
String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) { String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) {
this.numMasters = numMasters; this.numMasters = numMasters;
this.numAlwaysStandByMasters = numAlwaysStandByMasters;
this.masterClass = masterClass; this.masterClass = masterClass;
this.numRegionServers = numRegionServers; this.numRegionServers = numRegionServers;
this.rsPorts = rsPorts; this.rsPorts = rsPorts;
@ -119,6 +128,10 @@ public final class StartMiniClusterOption {
return numMasters; return numMasters;
} }
public int getNumAlwaysStandByMasters() {
return numAlwaysStandByMasters;
}
public Class<? extends HMaster> getMasterClass() { public Class<? extends HMaster> getMasterClass() {
return masterClass; return masterClass;
} }
@ -179,6 +192,7 @@ public final class StartMiniClusterOption {
*/ */
public static final class Builder { public static final class Builder {
private int numMasters = 1; private int numMasters = 1;
private int numAlwaysStandByMasters = 0;
private Class<? extends HMaster> masterClass = null; private Class<? extends HMaster> masterClass = null;
private int numRegionServers = 1; private int numRegionServers = 1;
private List<Integer> rsPorts = null; private List<Integer> rsPorts = null;
@ -196,8 +210,9 @@ public final class StartMiniClusterOption {
if (dataNodeHosts != null && dataNodeHosts.length != 0) { if (dataNodeHosts != null && dataNodeHosts.length != 0) {
numDataNodes = dataNodeHosts.length; numDataNodes = dataNodeHosts.length;
} }
return new StartMiniClusterOption(numMasters, masterClass, numRegionServers, rsPorts, rsClass, return new StartMiniClusterOption(numMasters,numAlwaysStandByMasters, masterClass,
numDataNodes, dataNodeHosts, numZkServers, createRootDir, createWALDir); numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers,
createRootDir, createWALDir);
} }
public Builder numMasters(int numMasters) { public Builder numMasters(int numMasters) {
@ -205,6 +220,11 @@ public final class StartMiniClusterOption {
return this; return this;
} }
public Builder numAlwaysStandByMasters(int numAlwaysStandByMasters) {
this.numAlwaysStandByMasters = numAlwaysStandByMasters;
return this;
}
public Builder masterClass(Class<? extends HMaster> masterClass) { public Builder masterClass(Class<? extends HMaster> masterClass) {
this.masterClass = masterClass; this.masterClass = masterClass;
return this; return this;

View File

@ -60,8 +60,7 @@ public abstract class AbstractTestRegionLocator {
UTIL.waitTableAvailable(TABLE_NAME); UTIL.waitTableAvailable(TABLE_NAME);
try (ConnectionRegistry registry = try (ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) { ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(UTIL.getConfiguration(), RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
registry, REGION_REPLICATION);
} }
UTIL.getAdmin().balancerSwitch(false, true); UTIL.getAdmin().balancerSwitch(false, true);
} }

View File

@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
@ -43,24 +43,32 @@ public final class RegionReplicaTestHelper {
} }
// waits for all replicas to have region location // waits for all replicas to have region location
static void waitUntilAllMetaReplicasHavingRegionLocation(Configuration conf, static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtility util,
ConnectionRegistry registry, int regionReplication) throws IOException { ConnectionRegistry registry) {
Configuration conf = util.getConfiguration();
int regionReplicaCount = util.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
HConstants.DEFAULT_META_REPLICA_NUM);
Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true, Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
new ExplainingPredicate<IOException>() { new ExplainingPredicate<IOException>() {
@Override @Override
public String explainFailure() throws IOException { public String explainFailure() {
return "Not all meta replicas get assigned"; return "Not all meta replicas get assigned";
} }
@Override @Override
public boolean evaluate() throws IOException { public boolean evaluate() {
try { try {
RegionLocations locs = registry.getMetaRegionLocations().get(); RegionLocations locs = registry.getMetaRegionLocations().get();
if (locs.size() < regionReplication) { if (locs.size() < regionReplicaCount) {
return false; return false;
} }
for (int i = 0; i < regionReplication; i++) { for (int i = 0; i < regionReplicaCount; i++) {
if (locs.getRegionLocation(i) == null) { HRegionLocation loc = locs.getRegionLocation(i);
// Wait until the replica is served by a region server. There could be delay between
// the replica being available to the connection and region server opening it.
Optional<ServerName> rsCarryingReplica =
getRSCarryingReplica(util, loc.getRegion().getTable(), i);
if (!rsCarryingReplica.isPresent()) {
return false; return false;
} }
} }

View File

@ -30,6 +30,7 @@ import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After; import org.junit.After;
@ -84,7 +85,9 @@ public abstract class TestAsyncAdminBase {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.startMiniCluster(2); StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).
numMasters(2).build();
TEST_UTIL.startMiniCluster(option);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
} }

View File

@ -48,8 +48,6 @@ public class TestAsyncAdminMasterSwitch extends TestAsyncAdminBase {
assertEquals(TEST_UTIL.getHBaseCluster().getRegionServerThreads().size(), assertEquals(TEST_UTIL.getHBaseCluster().getRegionServerThreads().size(),
admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.SERVERS_NAME)).join() admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.SERVERS_NAME)).join()
.getServersName().size()); .getServersName().size());
// stop the old master, and start a new one
TEST_UTIL.getMiniHBaseCluster().startMaster();
TEST_UTIL.getMiniHBaseCluster().stopMaster(0).join(); TEST_UTIL.getMiniHBaseCluster().stopMaster(0).join();
assertTrue(TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(30000)); assertTrue(TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(30000));
// make sure that we could still call master // make sure that we could still call master

View File

@ -56,7 +56,7 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
try (ConnectionRegistry registry = try (ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) { ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
RegionReplicaTestHelper RegionReplicaTestHelper
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3); .waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry);
} }
} }

View File

@ -54,8 +54,7 @@ public class TestAsyncMetaRegionLocator {
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
TEST_UTIL.waitUntilNoRegionsInTransition(); TEST_UTIL.waitUntilNoRegionsInTransition();
REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
RegionReplicaTestHelper RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);
LOCATOR = new AsyncMetaRegionLocator(REGISTRY); LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
} }

View File

@ -92,7 +92,7 @@ public class TestAsyncTableUseMetaReplicas {
FailPrimaryMetaScanCp.class.getName()); FailPrimaryMetaScanCp.class.getName());
UTIL.startMiniCluster(3); UTIL.startMiniCluster(3);
try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) { try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(conf, registry, 3); RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
} }
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));

View File

@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient; import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -44,10 +46,12 @@ import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
@ -147,6 +151,19 @@ public class TestClientTimeouts {
User ticket, int rpcTimeout) throws UnknownHostException { User ticket, int rpcTimeout) throws UnknownHostException {
return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
} }
@Override
public RpcChannel createRpcChannel(ServerName sn, User ticket, int rpcTimeout)
throws UnknownHostException {
return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
}
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
Preconditions.checkArgument(sns != null && sns.size() == 1);
return new RandomTimeoutRpcChannel(this, (ServerName)sns.toArray()[0], user, rpcTimeout);
}
} }
/** /**
@ -177,4 +194,27 @@ public class TestClientTimeouts {
return super.callBlockingMethod(md, controller, param, returnType); return super.callBlockingMethod(md, controller, param, returnType);
} }
} }
private static class RandomTimeoutRpcChannel extends AbstractRpcClient.RpcChannelImplementation {
RandomTimeoutRpcChannel(AbstractRpcClient<?> rpcClient, ServerName sn, User ticket,
int rpcTimeout) throws UnknownHostException {
super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
}
@Override
public void callMethod(MethodDescriptor md, RpcController controller, Message param,
Message returnType, RpcCallback<Message> done) {
RandomTimeoutBlockingRpcChannel.invokations.getAndIncrement();
if (ThreadLocalRandom.current().nextFloat() < RandomTimeoutBlockingRpcChannel.CHANCE_OF_TIMEOUT) {
// throw a ServiceException, because that is the only exception type that
// {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
// "actual" type, this may not properly mimic the underlying RpcEngine.
((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout"));
done.run(null);
return;
}
super.callMethod(md, controller, param, returnType, done);
}
}
} }

View File

@ -43,6 +43,7 @@ public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
@Parameterized.Parameters @Parameterized.Parameters
public static Collection parameters() { public static Collection parameters() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
{ MasterRegistry.class, 1},
{ ZKConnectionRegistry.class, 1} { ZKConnectionRegistry.class, 1}
}); });
} }

View File

@ -60,8 +60,7 @@ public class TestMetaRegionLocationCache {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation( RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
TEST_UTIL.getConfiguration(), REGISTRY, 3);
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);
} }
@ -123,8 +122,7 @@ public class TestMetaRegionLocationCache {
for (HRegionLocation location: currentMetaLocs) { for (HRegionLocation location: currentMetaLocs) {
RegionReplicaTestHelper.moveRegion(TEST_UTIL, location); RegionReplicaTestHelper.moveRegion(TEST_UTIL, location);
} }
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation( RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
TEST_UTIL.getConfiguration(), REGISTRY, 3);
for (JVMClusterUtil.MasterThread masterThread: for (JVMClusterUtil.MasterThread masterThread:
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) { TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
verifyCachedMetaLocations(masterThread.getMaster()); verifyCachedMetaLocations(masterThread.getMaster());

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@ -85,7 +86,9 @@ public class TestMetaWithReplicas {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.getConfiguration().setInt( TEST_UTIL.getConfiguration().setInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1000); StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1000);
TEST_UTIL.startMiniCluster(REGIONSERVERS_COUNT); StartMiniClusterOption option = StartMiniClusterOption.builder().
numAlwaysStandByMasters(1).numMasters(1).numRegionServers(REGIONSERVERS_COUNT).build();
TEST_UTIL.startMiniCluster(option);
AssignmentManager am = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); AssignmentManager am = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
Set<ServerName> sns = new HashSet<ServerName>(); Set<ServerName> sns = new HashSet<ServerName>();
ServerName hbaseMetaServerName = ServerName hbaseMetaServerName =

View File

@ -616,11 +616,12 @@ public class TestReplicaWithCluster {
} }
@Test @Test
public void testReplicaGetWithRpcClientImpl() throws IOException { public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true); HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
HTU.getConfiguration().set("hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.RpcClientImpl"); HTU.getConfiguration().set(
"hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
// Create table then get the single region for our new table. // Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithRpcClientImpl"); HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithAsyncRpcClientImpl");
hdt.setRegionReplication(NB_SERVERS); hdt.setRegionReplication(NB_SERVERS);
hdt.addCoprocessor(SlowMeCopro.class.getName()); hdt.addCoprocessor(SlowMeCopro.class.getName());

View File

@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@ -108,7 +108,6 @@ public class TestReplicasClient {
new AtomicReference<>(new CountDownLatch(0)); new AtomicReference<>(new CountDownLatch(0));
private static final AtomicReference<CountDownLatch> secondaryCdl = private static final AtomicReference<CountDownLatch> secondaryCdl =
new AtomicReference<>(new CountDownLatch(0)); new AtomicReference<>(new CountDownLatch(0));
Random r = new Random();
public SlowMeCopro() { public SlowMeCopro() {
} }
@ -197,7 +196,9 @@ public class TestReplicasClient {
HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration()); ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
HTU.startMiniCluster(NB_SERVERS); StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1).
numAlwaysStandByMasters(1).numMasters(1).build();
HTU.startMiniCluster(option);
// Create table then get the single region for our new table. // Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName()); HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());

View File

@ -86,7 +86,7 @@ public class TestZKConnectionRegistry {
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getActiveMaster().get()); REGISTRY.getActiveMaster().get());
RegionReplicaTestHelper RegionReplicaTestHelper
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); .waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY);
RegionLocations locs = REGISTRY.getMetaRegionLocations().get(); RegionLocations locs = REGISTRY.getMetaRegionLocations().get();
assertEquals(3, locs.getRegionLocations().length); assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> { IntStream.range(0, 3).forEach(i -> {

View File

@ -24,12 +24,15 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -37,6 +40,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -48,6 +52,8 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
@Category(MediumTests.class) @Category(MediumTests.class)
public class TestRpcClientLeaks { public class TestRpcClientLeaks {
@ -63,6 +69,9 @@ public class TestRpcClientLeaks {
public static class MyRpcClientImpl extends BlockingRpcClient { public static class MyRpcClientImpl extends BlockingRpcClient {
// Exceptions thrown only when this is set to false.
private static boolean throwException = false;
public MyRpcClientImpl(Configuration conf) { public MyRpcClientImpl(Configuration conf) {
super(conf); super(conf);
} }
@ -78,12 +87,26 @@ public class TestRpcClientLeaks {
@Override @Override
protected synchronized void setupConnection() throws IOException { protected synchronized void setupConnection() throws IOException {
super.setupConnection(); super.setupConnection();
if (throwException) {
SAVED_SOCKETS.add(socket); SAVED_SOCKETS.add(socket);
throw new IOException( throw new IOException(
"Sample exception for verifying socket closure in case of exceptions."); "Sample exception for verifying socket closure in case of exceptions.");
} }
}
}; };
} }
// To keep the registry paths happy.
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
Preconditions.checkState(sns != null && sns.size() == 1);
return super.createRpcChannel((ServerName)sns.toArray()[0], user, rpcTimeout);
}
public static void enableThrowExceptions() {
throwException = true;
}
} }
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@ -110,6 +133,7 @@ public class TestRpcClientLeaks {
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
try (Connection connection = ConnectionFactory.createConnection(conf); try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(name.getMethodName()))) { Table table = connection.getTable(TableName.valueOf(name.getMethodName()))) {
MyRpcClientImpl.enableThrowExceptions();
table.get(new Get(Bytes.toBytes("asd"))); table.get(new Get(Bytes.toBytes("asd")));
fail("Should fail because the injected error"); fail("Should fail because the injected error");
} catch (RetriesExhaustedException e) { } catch (RetriesExhaustedException e) {

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of HMaster that always runs as a stand by and never transitions to active.
*/
public class AlwaysStandByHMaster extends HMaster {
/**
* An implementation of ActiveMasterManager that never transitions it's master to active state. It
* always remains as a stand by master. With the master registry implementation (HBASE-18095) it
* is expected to have at least one active / standby master always running at any point in time
* since they serve as the gateway for client connections.
*
* With this implementation, tests can simulate the scenario of not having an active master yet
* the client connections to the cluster succeed.
*/
private static class AlwaysStandByMasterManager extends ActiveMasterManager {
private static final Logger LOG =
LoggerFactory.getLogger(AlwaysStandByMasterManager.class);
AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
super(watcher, sn, master);
}
/**
* An implementation that never transitions to an active master.
*/
boolean blockUntilBecomingActiveMaster(int checkInterval, MonitoredTask startupStatus) {
while (!(master.isAborted() || master.isStopped())) {
startupStatus.setStatus("Forever looping to stay as a standby master.");
try {
activeMasterServerName = null;
try {
if (MasterAddressTracker.getMasterAddress(watcher) != null) {
clusterHasActiveMaster.set(true);
}
Threads.sleepWithoutInterrupt(100);
} catch (IOException e) {
// pass, we will get notified when some other active master creates the znode.
}
} catch (KeeperException e) {
master.abort("Received an unexpected KeeperException, aborting", e);
return false;
}
synchronized (this.clusterHasActiveMaster) {
while (clusterHasActiveMaster.get() && !master.isStopped()) {
try {
clusterHasActiveMaster.wait(checkInterval);
} catch (InterruptedException e) {
// We expect to be interrupted when a master dies,
// will fall out if so
LOG.debug("Interrupted waiting for master to die", e);
}
}
if (clusterShutDown.get()) {
this.master.stop(
"Cluster went down before this master became active");
}
}
}
return false;
}
}
public AlwaysStandByHMaster(Configuration conf) throws IOException {
super(conf);
}
protected ActiveMasterManager createActiveMasterManager(
ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
return new AlwaysStandByMasterManager(zk, sn, server);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniClusterRule;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MediumTests.class, MasterTests.class})
public class TestAlwaysStandByHMaster {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAlwaysStandByHMaster.class);
private static final StartMiniClusterOption OPTION = StartMiniClusterOption.builder().
numAlwaysStandByMasters(1).numMasters(1).numRegionServers(1).build();
@ClassRule
public static final MiniClusterRule miniClusterRule = new MiniClusterRule(OPTION);
/**
* Tests that the AlwaysStandByHMaster does not transition to active state even if no active
* master exists.
*/
@Test public void testAlwaysStandBy() throws Exception {
HBaseTestingUtility testUtil = miniClusterRule.getTestingUtility();
// Make sure there is an active master.
assertNotNull(testUtil.getMiniHBaseCluster().getMaster());
assertEquals(2, testUtil.getMiniHBaseCluster().getMasterThreads().size());
// Kill the only active master.
testUtil.getMiniHBaseCluster().stopMaster(0).join();
// Wait for 5s to make sure the always standby doesn't transition to active state.
assertFalse(testUtil.getMiniHBaseCluster().waitForActiveAndReadyMaster(5000));
// Add a new master.
HMaster newActive = testUtil.getMiniHBaseCluster().startMaster().getMaster();
assertTrue(testUtil.getMiniHBaseCluster().waitForActiveAndReadyMaster(5000));
// Newly added master should be the active.
assertEquals(newActive.getServerName(),
testUtil.getMiniHBaseCluster().getMaster().getServerName());
}
}

View File

@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -86,6 +87,8 @@ public class TestMasterOperationsForRegionReplicas {
private static Connection CONNECTION = null; private static Connection CONNECTION = null;
private static Admin ADMIN; private static Admin ADMIN;
private static int numSlaves = 2; private static int numSlaves = 2;
private final static StartMiniClusterOption option = StartMiniClusterOption.builder().
numRegionServers(numSlaves).numMasters(1).numAlwaysStandByMasters(1).build();
private static Configuration conf; private static Configuration conf;
@Rule @Rule
@ -95,16 +98,21 @@ public class TestMasterOperationsForRegionReplicas {
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
TEST_UTIL.startMiniCluster(numSlaves); TEST_UTIL.startMiniCluster(option);
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);
CONNECTION = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); resetConnections();
ADMIN = CONNECTION.getAdmin();
while (ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics() while (ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
.size() < numSlaves) { .size() < numSlaves) {
Thread.sleep(100); Thread.sleep(100);
} }
} }
private static void resetConnections() throws IOException {
IOUtils.closeQuietly(ADMIN, CONNECTION);
CONNECTION = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
ADMIN = CONNECTION.getAdmin();
}
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
Closeables.close(ADMIN, true); Closeables.close(ADMIN, true);
@ -199,6 +207,7 @@ public class TestMasterOperationsForRegionReplicas {
TEST_UTIL.startMiniHBaseCluster(option); TEST_UTIL.startMiniHBaseCluster(option);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName); TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
TEST_UTIL.waitUntilNoRegionsInTransition(); TEST_UTIL.waitUntilNoRegionsInTransition();
resetConnections();
validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica, validateFromSnapshotFromMeta(TEST_UTIL, tableName, numRegions, numReplica,
ADMIN.getConnection()); ADMIN.getConnection());
@ -208,6 +217,7 @@ public class TestMasterOperationsForRegionReplicas {
TEST_UTIL.startMiniHBaseCluster(); TEST_UTIL.startMiniHBaseCluster();
TEST_UTIL.waitUntilAllRegionsAssigned(tableName); TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
TEST_UTIL.waitUntilNoRegionsInTransition(); TEST_UTIL.waitUntilNoRegionsInTransition();
resetConnections();
validateSingleRegionServerAssignment(ADMIN.getConnection(), numRegions, numReplica); validateSingleRegionServerAssignment(ADMIN.getConnection(), numRegions, numReplica);
for (int i = 1; i < numSlaves; i++) { // restore the cluster for (int i = 1; i < numSlaves; i++) { // restore the cluster
TEST_UTIL.getMiniHBaseCluster().startRegionServer(); TEST_UTIL.getMiniHBaseCluster().startRegionServer();

View File

@ -113,6 +113,7 @@ public class TestRegionMoveAndAbandon {
// Start up everything again // Start up everything again
LOG.info("Starting cluster"); LOG.info("Starting cluster");
UTIL.getMiniHBaseCluster().startMaster(); UTIL.getMiniHBaseCluster().startMaster();
UTIL.invalidateConnection();
UTIL.ensureSomeRegionServersAvailable(2); UTIL.ensureSomeRegionServersAvailable(2);
UTIL.waitFor(30_000, new Waiter.Predicate<Exception>() { UTIL.waitFor(30_000, new Waiter.Predicate<Exception>() {

View File

@ -141,6 +141,7 @@ public class TestRegionSplit {
JVMClusterUtil.MasterThread t = UTIL.getHBaseCluster().startMaster(); JVMClusterUtil.MasterThread t = UTIL.getHBaseCluster().startMaster();
Thread.sleep(500); Thread.sleep(500);
UTIL.invalidateConnection();
// enable table // enable table
UTIL.getAdmin().enableTable(tableName); UTIL.getAdmin().enableTable(tableName);
Thread.sleep(500); Thread.sleep(500);

View File

@ -32,7 +32,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
@ -115,7 +115,8 @@ public class TestNamespaceAuditor {
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
conf.setClass("hbase.coprocessor.regionserver.classes", CPRegionServerObserver.class, conf.setClass("hbase.coprocessor.regionserver.classes", CPRegionServerObserver.class,
RegionServerObserver.class); RegionServerObserver.class);
UTIL.startMiniCluster(); StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(2).build();
UTIL.startMiniCluster(option);
waitForQuotaInitialize(UTIL); waitForQuotaInitialize(UTIL);
ADMIN = UTIL.getAdmin(); ADMIN = UTIL.getAdmin();
} }
@ -479,17 +480,6 @@ public class TestNamespaceAuditor {
return getQuotaManager().getState(namespace); return getQuotaManager().getState(namespace);
} }
byte[] getSplitKey(byte[] startKey, byte[] endKey) {
String skey = Bytes.toString(startKey);
int key;
if (StringUtils.isBlank(skey)) {
key = Integer.parseInt(Bytes.toString(endKey))/2 ;
} else {
key = (int) (Integer.parseInt(skey) * 1.5);
}
return Bytes.toBytes("" + key);
}
public static class CustomObserver implements RegionCoprocessor, RegionObserver { public static class CustomObserver implements RegionCoprocessor, RegionObserver {
volatile CountDownLatch postCompact; volatile CountDownLatch postCompact;
@ -541,7 +531,7 @@ public class TestNamespaceAuditor {
} }
}); });
NamespaceTableAndRegionInfo before = getNamespaceState(nsp1); NamespaceTableAndRegionInfo before = getNamespaceState(nsp1);
restartMaster(); killActiveMaster();
NamespaceTableAndRegionInfo after = getNamespaceState(nsp1); NamespaceTableAndRegionInfo after = getNamespaceState(nsp1);
assertEquals("Expected: " + before.getTables() + " Found: " + after.getTables(), before assertEquals("Expected: " + before.getTables() + " Found: " + after.getTables(), before
.getTables().size(), after.getTables().size()); .getTables().size(), after.getTables().size());
@ -561,10 +551,9 @@ public class TestNamespaceAuditor {
}); });
} }
private void restartMaster() throws Exception { private void killActiveMaster() throws Exception {
UTIL.getHBaseCluster().getMaster(0).stop("Stopping to start again"); UTIL.getHBaseCluster().getMaster(0).stop("Stopping to start again");
UTIL.getHBaseCluster().waitOnMaster(0); UTIL.getHBaseCluster().waitOnMaster(0);
UTIL.getHBaseCluster().startMaster();
waitForQuotaInitialize(UTIL); waitForQuotaInitialize(UTIL);
} }

View File

@ -95,9 +95,8 @@ public class TestRSKilledWhenInitializing {
TEST_UTIL.startMiniDFSCluster(3); TEST_UTIL.startMiniDFSCluster(3);
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniZKCluster();
TEST_UTIL.createRootDir(); TEST_UTIL.createRootDir();
final LocalHBaseCluster cluster = final LocalHBaseCluster cluster = new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS,
new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS, HMaster.class, HMaster.class, RegisterAndDieRegionServer.class);
RegisterAndDieRegionServer.class);
final MasterThread master = startMaster(cluster.getMasters().get(0)); final MasterThread master = startMaster(cluster.getMasters().get(0));
try { try {
// Master is up waiting on RegionServers to check in. Now start RegionServers. // Master is up waiting on RegionServers to check in. Now start RegionServers.

View File

@ -144,7 +144,9 @@ public class TestRegionServerNoMaster {
@AfterClass @AfterClass
public static void afterClass() throws Exception { public static void afterClass() throws Exception {
HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
if (table != null) {
table.close(); table.close();
}
HTU.shutdownMiniCluster(); HTU.shutdownMiniCluster();
} }

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -141,7 +142,8 @@ public class TestSplitTransactionOnCluster {
@BeforeClass public static void before() throws Exception { @BeforeClass public static void before() throws Exception {
TESTING_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_PERIOD, 60000); TESTING_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_PERIOD, 60000);
StartMiniClusterOption option = StartMiniClusterOption.builder() StartMiniClusterOption option = StartMiniClusterOption.builder()
.masterClass(MyMaster.class).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build(); .masterClass(MyMaster.class).numRegionServers(NB_SERVERS).
numDataNodes(NB_SERVERS).build();
TESTING_UTIL.startMiniCluster(option); TESTING_UTIL.startMiniCluster(option);
} }
@ -811,6 +813,10 @@ public class TestSplitTransactionOnCluster {
cluster.waitOnMaster(0); cluster.waitOnMaster(0);
HMaster master = cluster.startMaster().getMaster(); HMaster master = cluster.startMaster().getMaster();
cluster.waitForActiveAndReadyMaster(); cluster.waitForActiveAndReadyMaster();
// reset the connections
IOUtils.closeQuietly(admin);
TESTING_UTIL.invalidateConnection();
admin = TESTING_UTIL.getAdmin();
return master; return master;
} }

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -73,8 +74,8 @@ public class TestReplicationBase {
protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
protected static final Configuration CONF1 = UTIL1.getConfiguration(); protected static Configuration CONF1 = UTIL1.getConfiguration();
protected static final Configuration CONF2 = UTIL2.getConfiguration(); protected static Configuration CONF2 = UTIL2.getConfiguration();
protected static final int NUM_SLAVES1 = 2; protected static final int NUM_SLAVES1 = 2;
protected static final int NUM_SLAVES2 = 4; protected static final int NUM_SLAVES2 = 4;
@ -203,13 +204,27 @@ public class TestReplicationBase {
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
} }
protected static void restartHBaseCluster(HBaseTestingUtility util, int numSlaves) static void restartSourceCluster(int numSlaves)
throws Exception { throws Exception {
util.shutdownMiniHBaseCluster(); IOUtils.closeQuietly(hbaseAdmin, htable1);
util.restartHBaseCluster(numSlaves); UTIL1.shutdownMiniHBaseCluster();
UTIL1.restartHBaseCluster(numSlaves);
// Invalidate the cached connection state.
CONF1 = UTIL1.getConfiguration();
hbaseAdmin = UTIL1.getAdmin();
Connection connection1 = UTIL1.getConnection();
htable1 = connection1.getTable(tableName);
} }
protected static void startClusters() throws Exception { static void restartTargetHBaseCluster(int numSlaves) throws Exception {
IOUtils.closeQuietly(htable2);
UTIL2.restartHBaseCluster(numSlaves);
// Invalidate the cached connection state
CONF2 = UTIL2.getConfiguration();
htable2 = UTIL2.getConnection().getTable(tableName);
}
private static void startClusters() throws Exception {
UTIL1.startMiniZKCluster(); UTIL1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
admin = new ReplicationAdmin(CONF1); admin = new ReplicationAdmin(CONF1);

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -65,9 +64,8 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
Thread.sleep(SLEEP_TIME * NB_RETRIES); Thread.sleep(SLEEP_TIME * NB_RETRIES);
// disable and start the peer // disable and start the peer
admin.disablePeer("2"); hbaseAdmin.disableReplicationPeer("2");
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build(); restartTargetHBaseCluster(2);
UTIL2.startMiniHBaseCluster(option);
Get get = new Get(rowkey); Get get = new Get(rowkey);
for (int i = 0; i < NB_RETRIES; i++) { for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get); Result res = htable2.get(get);

View File

@ -121,20 +121,13 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
@Test @Test
public void testEditsDroppedWithDroppedTableNS() throws Exception { public void testEditsDroppedWithDroppedTableNS() throws Exception {
// also try with a namespace // also try with a namespace
Connection connection1 = ConnectionFactory.createConnection(CONF1); UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
try (Admin admin1 = connection1.getAdmin()) { UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
admin1.createNamespace(NamespaceDescriptor.create("NS").build()); try {
}
Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin2 = connection2.getAdmin()) {
admin2.createNamespace(NamespaceDescriptor.create("NS").build());
}
testEditsBehindDroppedTable(true, "NS:test_dropped"); testEditsBehindDroppedTable(true, "NS:test_dropped");
try (Admin admin1 = connection1.getAdmin()) { } finally {
admin1.deleteNamespace("NS"); UTIL1.getAdmin().deleteNamespace("NS");
} UTIL2.getAdmin().deleteNamespace("NS");
try (Admin admin2 = connection2.getAdmin()) {
admin2.deleteNamespace("NS");
} }
} }
@ -148,8 +141,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
// make sure we have a single region server only, so that all // make sure we have a single region server only, so that all
// edits for all tables go there // edits for all tables go there
UTIL1.shutdownMiniHBaseCluster(); restartSourceCluster(1);
UTIL1.startMiniHBaseCluster();
TableName tablename = TableName.valueOf(tName); TableName tablename = TableName.valueOf(tName);
byte[] familyName = Bytes.toBytes("fam"); byte[] familyName = Bytes.toBytes("fam");
@ -161,8 +153,8 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
.newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build(); .build();
Connection connection1 = ConnectionFactory.createConnection(CONF1); Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
Connection connection2 = ConnectionFactory.createConnection(CONF2); Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
try (Admin admin1 = connection1.getAdmin()) { try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table); admin1.createTable(table);
} }
@ -223,8 +215,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
// make sure we have a single region server only, so that all // make sure we have a single region server only, so that all
// edits for all tables go there // edits for all tables go there
UTIL1.shutdownMiniHBaseCluster(); restartSourceCluster(1);
UTIL1.startMiniHBaseCluster();
TableName tablename = TableName.valueOf("testdroppedtimed"); TableName tablename = TableName.valueOf("testdroppedtimed");
byte[] familyName = Bytes.toBytes("fam"); byte[] familyName = Bytes.toBytes("fam");

View File

@ -45,7 +45,7 @@ public class TestReplicationStatusAfterLagging extends TestReplicationBase {
@Test @Test
public void testReplicationStatusAfterLagging() throws Exception { public void testReplicationStatusAfterLagging() throws Exception {
UTIL2.shutdownMiniHBaseCluster(); UTIL2.shutdownMiniHBaseCluster();
restartHBaseCluster(UTIL1, 1); restartSourceCluster(1);
// add some values to cluster 1 // add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i)); Put p = new Put(Bytes.toBytes("row" + i));

View File

@ -52,7 +52,7 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli
htable1.put(p); htable1.put(p);
} }
Thread.sleep(10000); Thread.sleep(10000);
restartHBaseCluster(UTIL1, 1); restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin(); Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000); Thread.sleep(10000);

View File

@ -46,7 +46,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe
@Test @Test
public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception { public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
UTIL2.shutdownMiniHBaseCluster(); UTIL2.shutdownMiniHBaseCluster();
restartHBaseCluster(UTIL1, 1); restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin(); Admin hbaseAdmin = UTIL1.getAdmin();
// add some values to source cluster // add some values to source cluster
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {

View File

@ -43,7 +43,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe
@Test @Test
public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception { public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
UTIL2.shutdownMiniHBaseCluster(); UTIL2.shutdownMiniHBaseCluster();
restartHBaseCluster(UTIL1, 1); restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin(); Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000); Thread.sleep(10000);

View File

@ -54,7 +54,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery
htable1.put(p); htable1.put(p);
} }
Thread.sleep(10000); Thread.sleep(10000);
restartHBaseCluster(UTIL1, 1); restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin(); Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName(); ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000); Thread.sleep(10000);

View File

@ -143,7 +143,7 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
private void mimicSyncUpAfterDelete() throws Exception { private void mimicSyncUpAfterDelete() throws Exception {
LOG.debug("mimicSyncUpAfterDelete"); LOG.debug("mimicSyncUpAfterDelete");
UTIL2.shutdownMiniHBaseCluster(); shutDownTargetHBaseCluster();
List<Delete> list = new ArrayList<>(); List<Delete> list = new ArrayList<>();
// delete half of the rows // delete half of the rows
@ -169,8 +169,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101, assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
rowCount_ht2Source); rowCount_ht2Source);
UTIL1.shutdownMiniHBaseCluster(); shutDownSourceHBaseCluster();
UTIL2.restartHBaseCluster(1); restartTargetHBaseCluster(1);
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
@ -188,7 +188,7 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
if (i == NB_RETRIES - 1) { if (i == NB_RETRIES - 1) {
if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) { if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
// syncUP still failed. Let's look at the source in case anything wrong there // syncUP still failed. Let's look at the source in case anything wrong there
UTIL1.restartHBaseCluster(1); restartSourceHBaseCluster(1);
rowCount_ht1Source = UTIL1.countRows(ht1Source); rowCount_ht1Source = UTIL1.countRows(ht1Source);
LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = UTIL1.countRows(ht2Source); rowCount_ht2Source = UTIL1.countRows(ht2Source);
@ -212,8 +212,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
private void mimicSyncUpAfterPut() throws Exception { private void mimicSyncUpAfterPut() throws Exception {
LOG.debug("mimicSyncUpAfterPut"); LOG.debug("mimicSyncUpAfterPut");
UTIL1.restartHBaseCluster(1); restartSourceHBaseCluster(1);
UTIL2.shutdownMiniHBaseCluster(); shutDownTargetHBaseCluster();
Put p; Put p;
// another 100 + 1 row to t1_syncup // another 100 + 1 row to t1_syncup
@ -243,8 +243,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
int rowCount_ht2Source = UTIL1.countRows(ht2Source); int rowCount_ht2Source = UTIL1.countRows(ht2Source);
assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
UTIL1.shutdownMiniHBaseCluster(); shutDownSourceHBaseCluster();
UTIL2.restartHBaseCluster(1); restartTargetHBaseCluster(1);
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
@ -264,7 +264,7 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
if (i == NB_RETRIES - 1) { if (i == NB_RETRIES - 1) {
if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) { if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
// syncUP still failed. Let's look at the source in case anything wrong there // syncUP still failed. Let's look at the source in case anything wrong there
UTIL1.restartHBaseCluster(1); restartSourceHBaseCluster(1);
rowCount_ht1Source = UTIL1.countRows(ht1Source); rowCount_ht1Source = UTIL1.countRows(ht1Source);
LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source); LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = UTIL1.countRows(ht2Source); rowCount_ht2Source = UTIL1.countRows(ht2Source);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -108,7 +109,7 @@ public abstract class TestReplicationSyncUpToolBase {
UTIL1.shutdownMiniCluster(); UTIL1.shutdownMiniCluster();
} }
protected final void setupReplication() throws Exception { final void setupReplication() throws Exception {
Admin admin1 = UTIL1.getAdmin(); Admin admin1 = UTIL1.getAdmin();
admin1.createTable(t1SyncupSource); admin1.createTable(t1SyncupSource);
admin1.createTable(t2SyncupSource); admin1.createTable(t2SyncupSource);
@ -135,7 +136,33 @@ public abstract class TestReplicationSyncUpToolBase {
admin1.addReplicationPeer("1", rpc); admin1.addReplicationPeer("1", rpc);
} }
protected final void syncUp(HBaseTestingUtility util) throws Exception { final void syncUp(HBaseTestingUtility util) throws Exception {
ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]); ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]);
} }
// Utilities that manager shutdown / restart of source / sink clusters. They take care of
// invalidating stale connections after shutdown / restarts.
final void shutDownSourceHBaseCluster() throws Exception {
IOUtils.closeQuietly(ht1Source, ht2Source);
UTIL1.shutdownMiniHBaseCluster();
}
final void shutDownTargetHBaseCluster() throws Exception {
IOUtils.closeQuietly(ht1TargetAtPeer1, ht2TargetAtPeer1);
UTIL2.shutdownMiniHBaseCluster();
}
final void restartSourceHBaseCluster(int numServers) throws Exception {
IOUtils.closeQuietly(ht1Source, ht2Source);
UTIL1.restartHBaseCluster(numServers);
ht1Source = UTIL1.getConnection().getTable(TN1);
ht2Source = UTIL1.getConnection().getTable(TN2);
}
final void restartTargetHBaseCluster(int numServers) throws Exception {
IOUtils.closeQuietly(ht1TargetAtPeer1, ht2TargetAtPeer1);
UTIL2.restartHBaseCluster(numServers);
ht1TargetAtPeer1 = UTIL2.getConnection().getTable(TN1);
ht2TargetAtPeer1 = UTIL2.getConnection().getTable(TN2);
}
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -115,7 +116,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
} }
HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
walCoprocs); walCoprocs);
HTU.startMiniCluster(NB_SERVERS); StartMiniClusterOption option = StartMiniClusterOption.builder().numAlwaysStandByMasters(1).
numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
HTU.startMiniCluster(option);
// Create table then get the single region for our new table. // Create table then get the single region for our new table.
HTableDescriptor htd = HTU.createTableDescriptor(tableName.getNameAsString()); HTableDescriptor htd = HTU.createTableDescriptor(tableName.getNameAsString());

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.security.token; package org.apache.hadoop.hbase.security.token;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.NettyRpcClient; import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -73,8 +71,10 @@ public class TestDelegationTokenWithEncryption extends SecureTestCluster {
} }
@Parameters(name = "{index}: rpcClientImpl={0}") @Parameters(name = "{index}: rpcClientImpl={0}")
public static Collection<Object[]> parameters() { public static Collection<Object> parameters() {
return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() }, // Client connection supports only non-blocking RPCs (due to master registry restriction), hence
// we only test NettyRpcClient.
return Arrays.asList(
new Object[] { NettyRpcClient.class.getName() }); new Object[] { NettyRpcClient.class.getName() });
} }

View File

@ -21,7 +21,6 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.NettyRpcClient; import org.apache.hadoop.hbase.ipc.NettyRpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
@ -76,8 +74,10 @@ public class TestGenerateDelegationToken extends SecureTestCluster {
} }
@Parameters(name = "{index}: rpcClientImpl={0}") @Parameters(name = "{index}: rpcClientImpl={0}")
public static Collection<Object[]> parameters() { public static Collection<Object> parameters() {
return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() }, // Client connection supports only non-blocking RPCs (due to master registry restriction), hence
// we only test NettyRpcClient.
return Arrays.asList(
new Object[] { NettyRpcClient.class.getName() }); new Object[] { NettyRpcClient.class.getName() });
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
@ -383,6 +384,10 @@ public class TestTokenAuthentication {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL = new HBaseTestingUtility();
// Override the connection registry to avoid spinning up a mini cluster for the connection below
// to go through.
TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniZKCluster();
// register token type for protocol // register token type for protocol
SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(), SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),