HBASE-12471 Task 4. replace internal ConnectionManager#{delete,get}Connection use with #close, #createConnection (0.98, 0.99) under src/main/java

Move from HConnection to ClusterConnection or Connection
Use unmanaged connections where we use managed previous
(used the jdk7 https://docs.oracle.com/javase/7/docs/technotes/guides/language/try-with-resources.html idiom).

In ZKConfig, synchronize on Configuration rather than make a copy.
Making a copy we were dropping hbase configs in certain test context
(could not find the zk ensemble because default port).

In tests, some move to the new style connection setup but mostly
fixes for premature connection close or adding cleanup where it
was lacking.
This commit is contained in:
stack 2014-11-24 09:46:28 -08:00
parent 5911c030a5
commit 336c22d581
32 changed files with 387 additions and 334 deletions

View File

@ -595,7 +595,7 @@ public interface Admin extends Abortable, Closeable {
/**
* Compact all regions on the region server
* @param regionserver the region server name
* @param sn the region server name
* @param major if it's major compaction
* @throws IOException
* @throws InterruptedException
@ -1288,7 +1288,7 @@ public interface Admin extends Abortable, Closeable {
* @return A RegionServerCoprocessorRpcChannel instance
*/
CoprocessorRpcChannel coprocessorService(ServerName sn);
/**
* Update the configuration and trigger an online config change

View File

@ -163,7 +163,7 @@ class AsyncProcess {
// TODO: many of the fields should be made private
protected final long id;
protected final ClusterConnection hConnection;
protected final ClusterConnection connection;
protected final RpcRetryingCallerFactory rpcCallerFactory;
protected final RpcControllerFactory rpcFactory;
protected final BatchErrors globalErrors;
@ -246,7 +246,7 @@ class AsyncProcess {
throw new IllegalArgumentException("HConnection cannot be null.");
}
this.hConnection = hc;
this.connection = hc;
this.pool = pool;
this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
@ -340,7 +340,7 @@ class AsyncProcess {
new HashMap<ServerName, MultiAction<Row>>();
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
NonceGenerator ng = this.hConnection.getNonceGenerator();
NonceGenerator ng = this.connection.getNonceGenerator();
long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
// Location errors that happen before we decide what requests to take.
@ -363,7 +363,7 @@ class AsyncProcess {
try {
if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
// Make sure we get 0-s replica.
RegionLocations locs = hConnection.locateRegion(
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
throw new IOException("#" + id + ", no location found, aborting submit for"
@ -535,7 +535,7 @@ class AsyncProcess {
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.hConnection.getNonceGenerator();
NonceGenerator ng = this.connection.getNonceGenerator();
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
@ -910,7 +910,7 @@ class AsyncProcess {
", row cannot be null");
RegionLocations loc = null;
try {
loc = hConnection.locateRegion(
loc = connection.locateRegion(
tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
} catch (IOException ex) {
manageLocationError(action, ex);
@ -1025,7 +1025,7 @@ class AsyncProcess {
if (tableName == null) {
// tableName is null when we made a cross-table RPC call.
hConnection.clearCaches(server);
connection.clearCaches(server);
}
int failed = 0, stopped = 0;
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
@ -1036,7 +1036,7 @@ class AsyncProcess {
// any of the regions in the MultiAction.
// TODO: depending on type of exception we might not want to update cache at all?
if (tableName != null) {
hConnection.updateCachedLocations(tableName, regionName, row, null, server);
connection.updateCachedLocations(tableName, regionName, row, null, server);
}
for (Action<Row> action : e.getValue()) {
Retry retry = manageError(
@ -1150,7 +1150,7 @@ class AsyncProcess {
// Register corresponding failures once per server/once per region.
if (!regionFailureRegistered) {
regionFailureRegistered = true;
hConnection.updateCachedLocations(
connection.updateCachedLocations(
tableName, regionName, row.getRow(), result, server);
}
if (failureCount == 0) {
@ -1199,7 +1199,7 @@ class AsyncProcess {
errorsByServer.reportServerError(server);
canRetry = errorsByServer.canRetryMore(numAttempt);
}
hConnection.updateCachedLocations(
connection.updateCachedLocations(
tableName, region, actions.get(0).getAction().getRow(), throwable, server);
failureCount += actions.size();
@ -1514,7 +1514,7 @@ class AsyncProcess {
@VisibleForTesting
protected MultiServerCallable<Row> createCallable(final ServerName server,
TableName tableName, final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(hConnection, tableName, server, this.rpcFactory, multi);
return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
}
/**

View File

@ -249,7 +249,7 @@ class ConnectionManager {
*/
@VisibleForTesting
static NonceGenerator injectNonceGeneratorForTesting(
HConnection conn, NonceGenerator cnm) {
ClusterConnection conn, NonceGenerator cnm) {
HConnectionImplementation connImpl = (HConnectionImplementation)conn;
NonceGenerator ng = connImpl.getNonceGenerator();
LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());

View File

@ -79,7 +79,7 @@ public class ConnectionUtils {
* @return old nonce generator.
*/
public static NonceGenerator injectNonceGeneratorForTesting(
HConnection conn, NonceGenerator cnm) {
ClusterConnection conn, NonceGenerator cnm) {
return ConnectionManager.injectNonceGeneratorForTesting(conn, cnm);
}
@ -111,7 +111,7 @@ public class ConnectionUtils {
* @param client the client interface of the local server
* @return an adapted/decorated HConnection
*/
public static HConnection createShortCircuitHConnection(final HConnection conn,
public static HConnection createShortCircuitHConnection(final Connection conn,
final ServerName serverName, final AdminService.BlockingInterface admin,
final ClientService.BlockingInterface client) {
return new ConnectionAdapter(conn) {

View File

@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
@ -175,8 +174,6 @@ public class HBaseAdmin implements Admin {
private static final String ZK_IDENTIFIER_PREFIX = "hbase-admin-on-";
// We use the implementation class rather then the interface because we
// need the package protected functions to get the connection to master
private ClusterConnection connection;
private volatile Configuration conf;
@ -1443,8 +1440,7 @@ public class HBaseAdmin implements Admin {
* Get all the online regions on a region server.
*/
@Override
public List<HRegionInfo> getOnlineRegions(
final ServerName sn) throws IOException {
public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
return ProtobufUtil.getOnlineRegions(admin);
}
@ -2333,12 +2329,6 @@ public class HBaseAdmin implements Admin {
});
}
private HRegionLocation getFirstMetaServerForTable(final TableName tableName)
throws IOException {
return connection.locateRegion(TableName.META_TABLE_NAME,
HRegionInfo.createRegionName(tableName, null, HConstants.NINES, false));
}
/**
* @return Configuration used by the instance.
*/
@ -2495,52 +2485,40 @@ public class HBaseAdmin implements Admin {
/**
* Check to see if HBase is running. Throw an exception if not.
* We consider that HBase is running if ZooKeeper and Master are running.
*
* @param conf system configuration
* @throws MasterNotRunningException if the master is not running
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
*/
// Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
public static void checkHBaseAvailable(Configuration conf)
throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
Configuration copyOfConf = HBaseConfiguration.create(conf);
// We set it to make it fail as soon as possible if HBase is not available
copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
copyOfConf.setInt("zookeeper.recovery.retry", 0);
ConnectionManager.HConnectionImplementation connection
= (ConnectionManager.HConnectionImplementation)
HConnectionManager.getConnection(copyOfConf);
try {
// Check ZK first.
// If the connection exists, we may have a connection to ZK that does
// not work anymore
ZooKeeperKeepAliveConnection zkw = null;
try {
zkw = connection.getKeepAliveZooKeeperWatcher();
zkw.getRecoverableZooKeeper().getZooKeeper().exists(
zkw.baseZNode, false);
} catch (IOException e) {
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
} catch (InterruptedException e) {
throw (InterruptedIOException)
try (ClusterConnection connection =
(ClusterConnection)ConnectionFactory.createConnection(copyOfConf)) {
// Check ZK first.
// If the connection exists, we may have a connection to ZK that does not work anymore
ZooKeeperKeepAliveConnection zkw = null;
try {
// This is NASTY. FIX!!!! Dependent on internal implementation! TODO
zkw = ((ConnectionManager.HConnectionImplementation)connection).
getKeepAliveZooKeeperWatcher();
zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
} catch (IOException e) {
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
} catch (InterruptedException e) {
throw (InterruptedIOException)
new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
} catch (KeeperException e) {
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
} finally {
if (zkw != null) {
zkw.close();
} catch (KeeperException e) {
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
} finally {
if (zkw != null) {
zkw.close();
}
}
}
// Check Master
connection.isMasterRunning();
} finally {
connection.close();
}
}
@ -3779,8 +3757,9 @@ public class HBaseAdmin implements Admin {
@Override
public int getMasterInfoPort() throws IOException {
// TODO: Fix! Reaching into internal implementation!!!!
ConnectionManager.HConnectionImplementation connection =
(ConnectionManager.HConnectionImplementation) HConnectionManager.getConnection(conf);
(ConnectionManager.HConnectionImplementation)this.connection;
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
try {
return MasterAddressTracker.getMasterInfoPort(zkw);

View File

@ -63,6 +63,7 @@ public class MetaCache {
* Search the cache for a location that fits our table and row key.
* Return null if no suitable region is located.
*
*
* @param tableName
* @param row
* @return Null or region location found in cache.

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.util.Pair;
/**
* Used to view region location information for a single HBase table.
* Obtain an instance from an {@link HConnection}.
* Obtain an instance from an {@link Connection}.
*
* @see ConnectionFactory
* @see Connection

View File

@ -84,16 +84,19 @@ public class ZKConfig {
Properties zkProperties = new Properties();
// Directly map all of the hbase.zookeeper.property.KEY properties.
for (Entry<String, String> entry : new Configuration(conf)) { // copy for mt safety
String key = entry.getKey();
if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
String value = entry.getValue();
// If the value has variables substitutions, need to do a get.
if (value.contains(VARIABLE_START)) {
value = conf.get(key);
// Synchronize on conf so no loading of configs while we iterate
synchronized (conf) {
for (Entry<String, String> entry : conf) {
String key = entry.getKey();
if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
String value = entry.getValue();
// If the value has variables substitutions, need to do a get.
if (value.contains(VARIABLE_START)) {
value = conf.get(key);
}
zkProperties.put(zkKey, value);
}
zkProperties.put(zkKey, value);
}
}

View File

@ -25,9 +25,11 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterManager.ServiceType;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
@ -44,8 +46,8 @@ import com.google.common.collect.Sets;
*/
@InterfaceAudience.Private
public class DistributedHBaseCluster extends HBaseCluster {
private HBaseAdmin admin;
private Admin admin;
private final Connection connection;
private ClusterManager clusterManager;
@ -53,7 +55,8 @@ public class DistributedHBaseCluster extends HBaseCluster {
throws IOException {
super(conf);
this.clusterManager = clusterManager;
this.admin = new HBaseAdmin(conf);
this.connection = ConnectionFactory.createConnection(conf);
this.admin = this.connection.getAdmin();
this.initialClusterStatus = getClusterStatus();
}
@ -84,18 +87,21 @@ public class DistributedHBaseCluster extends HBaseCluster {
if (this.admin != null) {
admin.close();
}
if (this.connection != null && !this.connection.isClosed()) {
this.connection.close();
}
}
@Override
public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
throws IOException {
return admin.getConnection().getAdmin(serverName);
return ((ClusterConnection)this.connection).getAdmin(serverName);
}
@Override
public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
throws IOException {
return admin.getConnection().getClient(serverName);
return ((ClusterConnection)this.connection).getClient(serverName);
}
@Override
@ -138,8 +144,7 @@ public class DistributedHBaseCluster extends HBaseCluster {
@Override
public MasterService.BlockingInterface getMasterAdminService()
throws IOException {
HConnection conn = HConnectionManager.getConnection(conf);
return conn.getMaster();
return ((ClusterConnection)this.connection).getMaster();
}
@Override
@ -183,18 +188,19 @@ public class DistributedHBaseCluster extends HBaseCluster {
}
@Override
public ServerName getServerHoldingRegion(byte[] regionName) throws IOException {
HConnection connection = admin.getConnection();
HRegionLocation regionLoc = connection.locateRegion(regionName);
public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
HRegionLocation regionLoc = null;
try (RegionLocator locator = connection.getRegionLocator(tn)) {
regionLoc = locator.getRegionLocation(regionName);
}
if (regionLoc == null) {
LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName)
+ " for table " + HRegionInfo.getTableName(regionName) + ", start key [" +
Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) +
", start key [" + Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
return null;
}
AdminProtos.AdminService.BlockingInterface client =
connection.getAdmin(regionLoc.getServerName());
((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName());
ServerInfo info = ProtobufUtil.getServerInfo(client);
return ProtobufUtil.toServerName(info.getServerName());
}
@ -374,7 +380,7 @@ public class DistributedHBaseCluster extends HBaseCluster {
} catch (IOException ioe) {
LOG.warn("While closing the old connection", ioe);
}
this.admin = new HBaseAdmin(conf);
this.admin = this.connection.getAdmin();
LOG.info("Added new HBaseAdmin");
return true;
}

View File

@ -37,4 +37,4 @@ import org.apache.hadoop.mapreduce.Mapper;
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
}

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* while the output value <u>must</u> be either a {@link Put} or a
* {@link Delete} instance.
*
* <p><KEY> is the type of the key. Ignored in this class.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable

View File

@ -48,9 +48,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -67,9 +67,9 @@ import org.apache.log4j.Logger;
/**
* A tool that is used for manipulating and viewing favored nodes information
* for regions. Run with -h to get a list of the options
*
*/
@InterfaceAudience.Private
// TODO: Remove? Unused. Partially implemented only.
public class RegionPlacementMaintainer {
private static final Log LOG = LogFactory.getLog(RegionPlacementMaintainer.class
.getName());
@ -93,9 +93,9 @@ public class RegionPlacementMaintainer {
private Configuration conf;
private final boolean enforceLocality;
private final boolean enforceMinAssignmentMove;
private HBaseAdmin admin;
private RackManager rackManager;
private Set<TableName> targetTableSet;
private final Connection connection;
public RegionPlacementMaintainer(Configuration conf) {
this(conf, true, true);
@ -108,7 +108,13 @@ public class RegionPlacementMaintainer {
this.enforceMinAssignmentMove = enforceMinAssignmentMove;
this.targetTableSet = new HashSet<TableName>();
this.rackManager = new RackManager(conf);
try {
this.connection = ConnectionFactory.createConnection(this.conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void printHelp(Options opt) {
new HelpFormatter().printHelp(
"RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
@ -124,17 +130,6 @@ public class RegionPlacementMaintainer {
}
}
/**
* @return the cached HBaseAdmin
* @throws IOException
*/
private HBaseAdmin getHBaseAdmin() throws IOException {
if (this.admin == null) {
this.admin = new HBaseAdmin(this.conf);
}
return this.admin;
}
/**
* @return the new RegionAssignmentSnapshot
* @throws IOException
@ -142,7 +137,7 @@ public class RegionPlacementMaintainer {
public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
throws IOException {
SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
new SnapshotOfRegionAssignmentFromMeta(HConnectionManager.getConnection(conf));
new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
currentAssignmentShapshot.initialize();
return currentAssignmentShapshot;
}
@ -210,8 +205,10 @@ public class RegionPlacementMaintainer {
// Get the all the region servers
List<ServerName> servers = new ArrayList<ServerName>();
servers.addAll(getHBaseAdmin().getClusterStatus().getServers());
try (Admin admin = this.connection.getAdmin()) {
servers.addAll(admin.getClusterStatus().getServers());
}
LOG.info("Start to generate assignment plan for " + numRegions +
" regions from table " + tableName + " with " +
servers.size() + " region servers");
@ -619,9 +616,9 @@ public class RegionPlacementMaintainer {
// sort the map based on region info
Map<HRegionInfo, List<ServerName>> assignmentMap =
new TreeMap<HRegionInfo, List<ServerName>>(plan.getAssignmentMap());
for (Map.Entry<HRegionInfo, List<ServerName>> entry : assignmentMap.entrySet()) {
String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
String regionName = entry.getKey().getRegionNameAsString();
LOG.info("Region: " + regionName );
@ -660,7 +657,6 @@ public class RegionPlacementMaintainer {
// Get the region to region server map
Map<ServerName, List<HRegionInfo>> currentAssignment =
this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
HConnection connection = this.getHBaseAdmin().getConnection();
// track of the failed and succeeded updates
int succeededNum = 0;
@ -691,10 +687,11 @@ public class RegionPlacementMaintainer {
}
if (singleServerPlan != null) {
// Update the current region server with its updated favored nodes
BlockingInterface currentRegionServer = connection.getAdmin(entry.getKey());
BlockingInterface currentRegionServer =
((ClusterConnection)this.connection).getAdmin(entry.getKey());
UpdateFavoredNodesRequest request =
RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
UpdateFavoredNodesResponse updateFavoredNodesResponse =
currentRegionServer.updateFavoredNodes(null, request);
LOG.info("Region server " +

View File

@ -36,7 +36,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HRegionInfo;
@ -46,8 +45,9 @@ import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
@ -137,7 +137,7 @@ public class ServerManager {
private final Server master;
private final MasterServices services;
private final HConnection connection;
private final ClusterConnection connection;
private final DeadServer deadservers = new DeadServer();
@ -201,7 +201,7 @@ public class ServerManager {
Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
this.connection = connect ? HConnectionManager.getConnection(c) : null;
this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
}
/**

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
@ -610,7 +611,7 @@ public class HRegionServer extends HasThread implements
*/
protected HConnection createShortCircuitConnection() throws IOException {
return ConnectionUtils.createShortCircuitHConnection(
HConnectionManager.getConnection(conf), serverName, rpcServices, rpcServices);
ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices);
}
/**

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
@ -135,9 +134,6 @@ public class SplitLogWorker implements Runnable {
try {
LOG.info("SplitLogWorker " + server.getServerName() + " starting");
coordination.registerListener();
// pre-initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf);
// wait for Coordination Engine is ready
boolean res = false;
while (!res && !coordination.isStop()) {

View File

@ -250,15 +250,18 @@ public abstract class HBaseCluster implements Closeable, Configurable {
* Get the ServerName of region server serving the first hbase:meta region
*/
public ServerName getServerHoldingMeta() throws IOException {
return getServerHoldingRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
return getServerHoldingRegion(TableName.META_TABLE_NAME,
HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
}
/**
* Get the ServerName of region server serving the specified region
* @param regionName Name of the region in bytes
* @param tn Table name that has the region.
* @return ServerName that hosts the region or null
*/
public abstract ServerName getServerHoldingRegion(byte[] regionName) throws IOException;
public abstract ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
throws IOException;
/**
* @return whether we are interacting with a distributed cluster as opposed to an

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@ -517,7 +516,6 @@ public class MiniHBaseCluster extends HBaseCluster {
if (this.hbaseCluster != null) {
this.hbaseCluster.shutdown();
}
HConnectionManager.deleteAllConnections(false);
}
@Override
@ -657,7 +655,8 @@ public class MiniHBaseCluster extends HBaseCluster {
}
@Override
public ServerName getServerHoldingRegion(byte[] regionName) throws IOException {
public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
throws IOException {
// Assume there is only one master thread which is the active master.
// If there are multiple master threads, the backup master threads
// should hold some regions. Please refer to #countServedRegions

View File

@ -882,7 +882,6 @@ public class TestAdmin1 {
admin.createTable(desc, splitKeys);
HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
Map<HRegionInfo, ServerName> regions = ht.getRegionLocations();
ht.close();
assertEquals("Tried to create " + expectedRegions + " regions "
+ "but only found " + regions.size(), expectedRegions, regions.size());
// Disable table.

View File

@ -52,7 +52,7 @@ public class TestHBaseAdminNoCluster {
* @throws ServiceException
*/
@Test
public void testMasterMonitorCollableRetries()
public void testMasterMonitorCallableRetries()
throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException {
Configuration configuration = HBaseConfiguration.create();
// Set the pause and retry count way down.
@ -61,20 +61,18 @@ public class TestHBaseAdminNoCluster {
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count);
// Get mocked connection. Getting the connection will register it so when HBaseAdmin is
// constructed with same configuration, it will find this mocked connection.
HConnection connection = HConnectionTestingUtility.getMockedConnection(configuration);
ClusterConnection connection = HConnectionTestingUtility.getMockedConnection(configuration);
// Mock so we get back the master interface. Make it so when createTable is called, we throw
// the PleaseHoldException.
MasterKeepAliveConnection masterAdmin =
Mockito.mock(MasterKeepAliveConnection.class);
MasterKeepAliveConnection masterAdmin = Mockito.mock(MasterKeepAliveConnection.class);
Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(),
(CreateTableRequest)Mockito.any())).
thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test")));
Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin);
// Mock up our admin Interfaces
Admin admin = new HBaseAdmin(configuration);
Admin admin = new HBaseAdmin(connection);
try {
HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("testMasterMonitorCollableRetries"));
new HTableDescriptor(TableName.valueOf("testMasterMonitorCollableRetries"));
// Pass any old htable descriptor; not important
try {
admin.createTable(htd, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
@ -87,7 +85,7 @@ public class TestHBaseAdminNoCluster {
(CreateTableRequest)Mockito.any());
} finally {
admin.close();
if (connection != null)HConnectionManager.deleteConnection(configuration);
if (connection != null) connection.close();
}
}
}

View File

@ -132,6 +132,8 @@ public class TestHCM {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
if (isJavaOk) {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
}
@ -1010,117 +1012,114 @@ public class TestHCM {
@Test (timeout=30000)
public void testMulti() throws Exception {
HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
TEST_UTIL.createMultiRegions(table, FAM_NAM);
ConnectionManager.HConnectionImplementation conn =
(ConnectionManager.HConnectionImplementation)
HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
try {
TEST_UTIL.createMultiRegions(table, FAM_NAM);
ConnectionManager.HConnectionImplementation conn =
( ConnectionManager.HConnectionImplementation)table.getConnection();
// We're now going to move the region and check that it works for the client
// First a new put to add the location in the cache
conn.clearRegionCache(TABLE_NAME3);
Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
// We're now going to move the region and check that it works for the client
// First a new put to add the location in the cache
conn.clearRegionCache(TABLE_NAME3);
Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
// We can wait for all regions to be online, that makes log reading easier when debugging
TEST_UTIL.waitUntilNoRegionsInTransition(20000);
// We can wait for all regions to be online, that makes log reading easier when debugging
while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
Thread.sleep(1);
}
Put put = new Put(ROW_X);
put.add(FAM_NAM, ROW_X, ROW_X);
table.put(put);
Put put = new Put(ROW_X);
put.add(FAM_NAM, ROW_X, ROW_X);
table.put(put);
// Now moving the region to the second server
HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
if (toMove == null) {
String msg = "Failed to find location for " + Bytes.toString(ROW_X) + " in " + TABLE_NAME3;
// Log so easier to see in output where error occurred.
LOG.error(msg);
throw new NullPointerException(msg);
}
byte[] regionName = toMove.getRegionInfo().getRegionName();
byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
// Now moving the region to the second server
HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
byte[] regionName = toMove.getRegionInfo().getRegionName();
byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
// Choose the other server.
int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
int destServerId = (curServerId == 0 ? 1 : 0);
// Choose the other server.
int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
int destServerId = (curServerId == 0 ? 1 : 0);
HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
ServerName destServerName = destServer.getServerName();
ServerName destServerName = destServer.getServerName();
//find another row in the cur server that is less than ROW_X
List<HRegion> regions = curServer.getOnlineRegions(TABLE_NAME3);
byte[] otherRow = null;
for (HRegion region : regions) {
if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
&& Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
otherRow = region.getRegionInfo().getStartKey();
break;
}
}
assertNotNull(otherRow);
// If empty row, set it to first row.-f
if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
Put put2 = new Put(otherRow);
put2.add(FAM_NAM, otherRow, otherRow);
table.put(put2); //cache put2's location
//find another row in the cur server that is less than ROW_X
List<HRegion> regions = curServer.getOnlineRegions(TABLE_NAME3);
byte[] otherRow = null;
for (HRegion region : regions) {
if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
&& Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
otherRow = region.getRegionInfo().getStartKey();
break;
}
}
assertNotNull(otherRow);
// If empty row, set it to first row.-f
if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
Put put2 = new Put(otherRow);
put2.add(FAM_NAM, otherRow, otherRow);
table.put(put2); //cache put2's location
// Check that we are in the expected state
Assert.assertTrue(curServer != destServer);
Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
Assert.assertNotNull(curServer.getOnlineRegion(regionName));
Assert.assertNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionStates().isRegionsInTransition());
// Check that we are in the expected state
Assert.assertTrue(curServer != destServer);
Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
Assert.assertNotNull(curServer.getOnlineRegion(regionName));
Assert.assertNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionStates().isRegionsInTransition());
// Moving. It's possible that we don't have all the regions online at this point, so
// the test must depends only on the region we're looking at.
LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
TEST_UTIL.getHBaseAdmin().move(
toMove.getRegionInfo().getEncodedNameAsBytes(),
destServerName.getServerName().getBytes()
);
// Moving. It's possible that we don't have all the regions online at this point, so
// the test must depends only on the region we're looking at.
LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
TEST_UTIL.getHBaseAdmin().move(
toMove.getRegionInfo().getEncodedNameAsBytes(),
destServerName.getServerName().getBytes()
);
while (destServer.getOnlineRegion(regionName) == null ||
destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
// wait for the move to be finished
Thread.sleep(1);
}
while (destServer.getOnlineRegion(regionName) == null ||
destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
// wait for the move to be finished
Thread.sleep(1);
}
LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
// Check our new state.
Assert.assertNull(curServer.getOnlineRegion(regionName));
Assert.assertNotNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
// Check our new state.
Assert.assertNull(curServer.getOnlineRegion(regionName));
Assert.assertNotNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
// Cache was NOT updated and points to the wrong server
Assert.assertFalse(
conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
.getPort() == destServerName.getPort());
// Cache was NOT updated and points to the wrong server
Assert.assertFalse(
conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
.getPort() == destServerName.getPort());
// Hijack the number of retry to fail after 2 tries
final int prevNumRetriesVal = setNumTries(conn, 2);
// Hijack the number of retry to fail after 2 tries
final int prevNumRetriesVal = setNumTries(conn, 2);
Put put3 = new Put(ROW_X);
put3.add(FAM_NAM, ROW_X, ROW_X);
Put put4 = new Put(otherRow);
put4.add(FAM_NAM, otherRow, otherRow);
Put put3 = new Put(ROW_X);
put3.add(FAM_NAM, ROW_X, ROW_X);
Put put4 = new Put(otherRow);
put4.add(FAM_NAM, otherRow, otherRow);
// do multi
table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
// second we get RegionMovedException.
// do multi
table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
// second we get RegionMovedException.
setNumTries(conn, prevNumRetriesVal);
table.close();
conn.close();
setNumTries(conn, prevNumRetriesVal);
} finally {
table.close();
}
}
@Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test

View File

@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -33,7 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -42,14 +40,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@ -71,6 +66,7 @@ public class TestMultiParallel {
private static final byte [][] KEYS = makeKeys();
private static final int slaves = 5; // also used for testing HTable pool size
private static Connection CONNECTION;
@BeforeClass public static void beforeClass() throws Exception {
// Uncomment the following lines if more verbosity is needed for
@ -83,9 +79,11 @@ public class TestMultiParallel {
UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
UTIL.waitTableEnabled(TEST_TABLE);
t.close();
CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
}
@AfterClass public static void afterClass() throws Exception {
CONNECTION.close();
UTIL.shutdownMiniCluster();
}
@ -98,9 +96,6 @@ public class TestMultiParallel {
// Wait until completing balance
UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
}
HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration());
conn.clearRegionCache();
conn.close();
LOG.info("before done");
}
@ -150,20 +145,26 @@ public class TestMultiParallel {
* @throws SecurityException
*/
@Test(timeout=300000)
public void testActiveThreadsCount() throws Exception{
Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
List<Row> puts = constructPutRequests(); // creates a Put for every region
table.batch(puts);
HashSet<ServerName> regionservers = new HashSet<ServerName>();
for (byte[] k : KEYS) {
HRegionLocation location = ((HTable)table).getRegionLocation(k);
regionservers.add(location.getServerName());
public void testActiveThreadsCount() throws Exception {
try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) {
ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
try {
try (Table t = connection.getTable(TEST_TABLE, executor)) {
List<Row> puts = constructPutRequests(); // creates a Put for every region
t.batch(puts);
HashSet<ServerName> regionservers = new HashSet<ServerName>();
try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
for (Row r : puts) {
HRegionLocation location = locator.getRegionLocation(r.getRow());
regionservers.add(location.getServerName());
}
}
assertEquals(regionservers.size(), executor.getLargestPoolSize());
}
} finally {
executor.shutdownNow();
}
}
Field poolField = table.getClass().getDeclaredField("pool");
poolField.setAccessible(true);
ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table);
assertEquals(regionservers.size(), tExecutor.getLargestPoolSize());
table.close();
}
@Test(timeout=300000)
@ -198,7 +199,7 @@ public class TestMultiParallel {
Cell[] multiKvs = multiRes[i].rawCells();
for (int j = 0; j < singleKvs.length; j++) {
Assert.assertEquals(singleKvs[j], multiKvs[j]);
Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
CellUtil.cloneValue(multiKvs[j])));
}
}
@ -330,7 +331,7 @@ public class TestMultiParallel {
@Test (timeout=300000)
public void testBatchWithPut() throws Exception {
LOG.info("test=testBatchWithPut");
Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
Table table = CONNECTION.getTable(TEST_TABLE);
// put multiple rows using a batch
List<Row> puts = constructPutRequests();
@ -348,9 +349,8 @@ public class TestMultiParallel {
results = table.batch(puts);
} catch (RetriesExhaustedWithDetailsException ree) {
LOG.info(ree.getExhaustiveDescription());
throw ree;
} finally {
table.close();
throw ree;
}
validateSizeAndEmpty(results, KEYS.length);
}
@ -491,7 +491,8 @@ public class TestMultiParallel {
@Test(timeout=300000)
public void testNonceCollision() throws Exception {
LOG.info("test=testNonceCollision");
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
Table table = connection.getTable(TEST_TABLE);
Put put = new Put(ONE_ROW);
put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
@ -510,8 +511,9 @@ public class TestMultiParallel {
return nonce;
}
};
NonceGenerator oldCnm =
ConnectionUtils.injectNonceGeneratorForTesting(table.getConnection(), cnm);
ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm);
// First test sequential requests.
try {
@ -541,7 +543,7 @@ public class TestMultiParallel {
public void run() {
Table table = null;
try {
table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
table = connection.getTable(TEST_TABLE);
} catch (IOException e) {
fail("Not expected");
}
@ -574,7 +576,7 @@ public class TestMultiParallel {
validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
table.close();
} finally {
ConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), oldCnm);
ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm);
}
}

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
@ -232,10 +234,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
* @throws IOException
*/
void assertExpectedTable(TableName table, int count, int value) throws IOException {
HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
assertEquals(htds.length, 1);
Table t = null;
try {
assertEquals(
util.getHBaseAdmin().listTables(table.getNameAsString()).length, 1);
t = new HTable(util.getConfiguration(), table);
Scan s = new Scan();
ResultScanner sr = t.getScanner(s);
@ -444,30 +446,33 @@ public class TestLoadIncrementalHFilesSplitRecovery {
public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta");
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
HTable table = new HTable(util.getConfiguration(), tableName);
// Share connection. We were failing to find the table with our new reverse scan because it
// looks for first region, not any region -- that is how it works now. The below removes first
// region in test. Was reliant on the Connection caching having first region.
Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
Table table = connection.getTable(tableName);
setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
Path dir = buildBulkFiles(tableName, 2);
final AtomicInteger countedLqis = new AtomicInteger();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
util.getConfiguration()) {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
protected List<LoadQueueItem> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
if (lqis != null) {
countedLqis.addAndGet(lqis.size());
protected List<LoadQueueItem> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
if (lqis != null) {
countedLqis.addAndGet(lqis.size());
}
return lqis;
}
return lqis;
}
};
};
// do bulkload when there is no region hole in hbase:meta.
try {
loader.doBulkLoad(dir, table);
loader.doBulkLoad(dir, (HTable)table);
} catch (Exception e) {
LOG.error("exeception=", e);
}
@ -477,18 +482,16 @@ public class TestLoadIncrementalHFilesSplitRecovery {
dir = buildBulkFiles(tableName, 3);
// Mess it up by leaving a hole in the hbase:meta
HConnection hConnection = HConnectionManager.getConnection(util.getConfiguration());
List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(
hConnection, tableName);
List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
for (HRegionInfo regionInfo : regionInfos) {
if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
MetaTableAccessor.deleteRegion(hConnection, regionInfo);
MetaTableAccessor.deleteRegion(connection, regionInfo);
break;
}
}
try {
loader.doBulkLoad(dir, table);
loader.doBulkLoad(dir, (HTable)table);
} catch (Exception e) {
LOG.error("exeception=", e);
assertTrue("IOException expected", e instanceof IOException);
@ -496,7 +499,42 @@ public class TestLoadIncrementalHFilesSplitRecovery {
table.close();
this.assertExpectedTable(tableName, ROWCOUNT, 2);
}
}
// Make sure at least the one region that still exists can be found.
regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
assertTrue(regionInfos.size() >= 1);
this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
connection.close();
}
/**
* Checks that all columns have the expected value and that there is the
* expected number of rows.
* @throws IOException
*/
void assertExpectedTable(final Connection connection, TableName table, int count, int value)
throws IOException {
HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
assertEquals(htds.length, 1);
Table t = null;
try {
t = connection.getTable(table);
Scan s = new Scan();
ResultScanner sr = t.getScanner(s);
int i = 0;
for (Result r : sr) {
i++;
for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
for (byte[] val : nm.values()) {
assertTrue(Bytes.equals(val, value(value)));
}
}
}
assertEquals(count, i);
} catch (IOException e) {
fail("Failed due to exception");
} finally {
if (t != null) t.close();
}
}
}

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@ -107,8 +108,6 @@ import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -337,7 +336,7 @@ public class TestDistributedLogSplitting {
HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
NonceGenerator oldNg =
ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), ng);
ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)ht.getConnection(), ng);
try {
List<Increment> reqs = new ArrayList<Increment>();
@ -371,7 +370,7 @@ public class TestDistributedLogSplitting {
}
}
} finally {
ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), oldNg);
ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) ht.getConnection(), oldNg);
ht.close();
zkw.close();
}

View File

@ -179,7 +179,6 @@ public class TestScannerWithBulkload {
table.put(put1);
table.flushCommits();
admin.flush(tableName);
admin.close();
put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version1")));
@ -200,7 +199,7 @@ public class TestScannerWithBulkload {
@Test
public void testBulkLoadWithParallelScan() throws Exception {
TableName tableName = TableName.valueOf("testBulkLoadWithParallelScan");
final long l = System.currentTimeMillis();
final long l = System.currentTimeMillis();
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
createTable(admin, tableName);
Scan scan = createScan();

View File

@ -107,8 +107,8 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Like {@link TestSplitTransaction} in that we're testing {@link SplitTransaction}
* only the below tests are against a running cluster where {@link TestSplitTransaction}
* Like TestSplitTransaction in that we're testing {@link SplitTransaction}
* only the below tests are against a running cluster where TestSplitTransaction
* is tests against a bare {@link HRegion}.
*/
@Category({RegionServerTests.class, LargeTests.class})
@ -904,7 +904,7 @@ public class TestSplitTransactionOnCluster {
fail("Each table should have at least one region.");
}
ServerName serverName =
cluster.getServerHoldingRegion(firstTableRegions.get(0).getRegionName());
cluster.getServerHoldingRegion(firstTable, firstTableRegions.get(0).getRegionName());
admin.move(secondTableRegions.get(0).getRegionInfo().getEncodedNameAsBytes(),
Bytes.toBytes(serverName.getServerName()));
Table table1 = null;

View File

@ -224,7 +224,8 @@ public class TestMultiSlaveReplication {
region.getWAL().registerWALActionsListener(listener);
// request a roll
admin.rollWALWriter(cluster.getServerHoldingRegion(region.getRegionName()));
admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
region.getRegionName()));
// wait
try {

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -29,15 +28,13 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -48,10 +45,10 @@ import org.junit.BeforeClass;
* All other tests should have their own classes and extend this one
*/
public class TestReplicationBase {
/*
{
((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL);
}
}*/
private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
@ -64,7 +61,7 @@ public class TestReplicationBase {
protected static ReplicationAdmin admin;
protected static HTable htable1;
protected static Table htable1;
protected static Table htable2;
protected static HBaseTestingUtility utility1;
@ -138,15 +135,19 @@ public class TestReplicationBase {
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
Admin admin1 = new HBaseAdmin(conf1);
Admin admin2 = new HBaseAdmin(conf2);
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection2 = ConnectionFactory.createConnection(conf2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
utility1.waitUntilAllRegionsAssigned(tableName);
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
utility2.waitUntilAllRegionsAssigned(tableName);
htable1 = new HTable(conf1, tableName);
htable1 = connection1.getTable(tableName);
htable1.setWriteBufferSize(1024);
htable2 = new HTable(conf2, tableName);
htable2 = connection2.getTable(tableName);
}
/**
@ -154,10 +155,10 @@ public class TestReplicationBase {
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
htable2.close();
htable1.close();
admin.close();
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
}
}

View File

@ -54,7 +54,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
*/
@Before
public void setUp() throws Exception {
htable1.setAutoFlush(false, true);
((HTable)htable1).setAutoFlush(false, true);
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r :

View File

@ -26,16 +26,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.AfterClass;
import org.junit.Assert;
@ -49,6 +54,7 @@ import org.junit.experimental.categories.Category;
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestReplicationEndpoint extends TestReplicationBase {
static final Log LOG = LogFactory.getLog(TestReplicationEndpoint.class);
static int numRegionServers;
@ -72,13 +78,14 @@ public class TestReplicationEndpoint extends TestReplicationBase {
ReplicationEndpointForTest.contructedCount.set(0);
ReplicationEndpointForTest.startedCount.set(0);
ReplicationEndpointForTest.replicateCount.set(0);
ReplicationEndpointReturningFalse.replicated.set(false);
ReplicationEndpointForTest.lastEntries = null;
for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName());
}
}
@Test
@Test (timeout=120000)
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
@ -117,17 +124,32 @@ public class TestReplicationEndpoint extends TestReplicationBase {
admin.removePeer("testCustomReplicationEndpoint");
}
@Test
@Test (timeout=120000)
public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate",
Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
int peerCount = admin.getPeersCount();
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
admin.addPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
// now replicate some data.
// This test is flakey and then there is so much stuff flying around in here its, hard to
// debug. Peer needs to be up for the edit to make it across. This wait on
// peer count seems to be a hack that has us not progress till peer is up.
if (admin.getPeersCount() <= peerCount) {
LOG.info("Waiting on peercount to go up from " + peerCount);
Threads.sleep(100);
}
// now replicate some data
doPut(row);
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
// Looks like replication endpoint returns false unless we put more than 10 edits. We
// only send over one edit.
int count = ReplicationEndpointForTest.replicateCount.get();
LOG.info("count=" + count);
return ReplicationEndpointReturningFalse.replicated.get();
}
});
@ -138,15 +160,17 @@ public class TestReplicationEndpoint extends TestReplicationBase {
admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
}
@Test
@Test (timeout=120000)
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
// now replicate some data.
doPut(Bytes.toBytes("row1"));
doPut(row);
doPut(Bytes.toBytes("row2"));
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
doPut(connection, Bytes.toBytes("row1"));
doPut(connection, row);
doPut(connection, Bytes.toBytes("row2"));
}
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@Override
@ -161,11 +185,17 @@ public class TestReplicationEndpoint extends TestReplicationBase {
private void doPut(byte[] row) throws IOException {
Put put = new Put(row);
put.add(famName, row, row);
htable1 = new HTable(conf1, tableName);
htable1.put(put);
htable1.close();
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
doPut(connection, row);
}
}
private void doPut(final Connection connection, final byte [] row) throws IOException {
try (Table t = connection.getTable(tableName)) {
Put put = new Put(row);
put.add(famName, row, row);
t.put(put);
}
}
private static void doAssert(byte[] row) throws Exception {
@ -217,6 +247,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
static int COUNT = 10;
static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
static AtomicBoolean replicated = new AtomicBoolean(false);
@Override
@ -229,8 +260,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
super.replicate(replicateContext);
LOG.info("Replicated " + row + ", count=" + replicateCount.get());
replicated.set(replicateCount.get() > 10); // first 10 times, we return false
replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
return replicated.get();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@ -54,7 +55,7 @@ public class TestReplicationKillRS extends TestReplicationBase {
Thread killer = killARegionServer(util, 5000, rsToKill1);
LOG.info("Start loading table");
int initialCount = utility1.loadTable(htable1, famName);
int initialCount = utility1.loadTable((HTable)htable1, famName);
LOG.info("Done loading table");
killer.join(5000);
LOG.info("Done waiting for threads");

View File

@ -69,7 +69,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
*/
@Before
public void setUp() throws Exception {
htable1.setAutoFlush(true, true);
((HTable)htable1).setAutoFlush(true, true);
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for ( JVMClusterUtil.RegionServerThread r :
@ -247,7 +247,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
LOG.info("testSmallBatch");
Put put;
// normal Batch tests
htable1.setAutoFlush(false, true);
((HTable)htable1).setAutoFlush(false, true);
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
put = new Put(Bytes.toBytes(i));
put.add(famName, row, row);
@ -387,7 +387,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testLoading() throws Exception {
LOG.info("Writing out rows to table1 in testLoading");
htable1.setWriteBufferSize(1024);
htable1.setAutoFlush(false, true);
((HTable)htable1).setAutoFlush(false, true);
for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
Put put = new Put(Bytes.toBytes(i));
put.add(famName, row, row);

View File

@ -86,7 +86,7 @@ public class TestAccessController2 extends SecureTestUtil {
public Object run() throws Exception {
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE.getTableName());
desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
Admin admin = new HBaseAdmin(conf);
Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
try {
admin.createTable(desc);
} finally {