HBASE-12471 Task 4. replace internal ConnectionManager#{delete,get}Connection use with #close, #createConnection (0.98, 0.99)

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
This commit is contained in:
stack 2014-11-17 22:32:56 -08:00
parent 463198dcdc
commit 4f5cde8a69
27 changed files with 301 additions and 278 deletions

View File

@ -591,7 +591,7 @@ public interface Admin extends Abortable, Closeable {
/** /**
* Compact all regions on the region server * Compact all regions on the region server
* @param regionserver the region server name * @param sn the region server name
* @param major if it's major compaction * @param major if it's major compaction
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
@ -1268,7 +1268,7 @@ public interface Admin extends Abortable, Closeable {
* @return A RegionServerCoprocessorRpcChannel instance * @return A RegionServerCoprocessorRpcChannel instance
*/ */
CoprocessorRpcChannel coprocessorService(ServerName sn); CoprocessorRpcChannel coprocessorService(ServerName sn);
/** /**
* Update the configuration and trigger an online config change * Update the configuration and trigger an online config change

View File

@ -161,7 +161,7 @@ class AsyncProcess {
// TODO: many of the fields should be made private // TODO: many of the fields should be made private
protected final long id; protected final long id;
protected final ClusterConnection hConnection; protected final ClusterConnection connection;
protected final RpcRetryingCallerFactory rpcCallerFactory; protected final RpcRetryingCallerFactory rpcCallerFactory;
protected final RpcControllerFactory rpcFactory; protected final RpcControllerFactory rpcFactory;
protected final BatchErrors globalErrors; protected final BatchErrors globalErrors;
@ -244,7 +244,7 @@ class AsyncProcess {
throw new IllegalArgumentException("HConnection cannot be null."); throw new IllegalArgumentException("HConnection cannot be null.");
} }
this.hConnection = hc; this.connection = hc;
this.pool = pool; this.pool = pool;
this.globalErrors = useGlobalErrors ? new BatchErrors() : null; this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
@ -338,7 +338,7 @@ class AsyncProcess {
new HashMap<ServerName, MultiAction<Row>>(); new HashMap<ServerName, MultiAction<Row>>();
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
NonceGenerator ng = this.hConnection.getNonceGenerator(); NonceGenerator ng = this.connection.getNonceGenerator();
long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
// Location errors that happen before we decide what requests to take. // Location errors that happen before we decide what requests to take.
@ -361,7 +361,7 @@ class AsyncProcess {
try { try {
if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
// Make sure we get 0-s replica. // Make sure we get 0-s replica.
RegionLocations locs = hConnection.locateRegion( RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
throw new IOException("#" + id + ", no location found, aborting submit for" throw new IOException("#" + id + ", no location found, aborting submit for"
@ -533,7 +533,7 @@ class AsyncProcess {
// The position will be used by the processBatch to match the object array returned. // The position will be used by the processBatch to match the object array returned.
int posInList = -1; int posInList = -1;
NonceGenerator ng = this.hConnection.getNonceGenerator(); NonceGenerator ng = this.connection.getNonceGenerator();
for (Row r : rows) { for (Row r : rows) {
posInList++; posInList++;
if (r instanceof Put) { if (r instanceof Put) {
@ -908,7 +908,7 @@ class AsyncProcess {
", row cannot be null"); ", row cannot be null");
RegionLocations loc = null; RegionLocations loc = null;
try { try {
loc = hConnection.locateRegion( loc = connection.locateRegion(
tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
} catch (IOException ex) { } catch (IOException ex) {
manageLocationError(action, ex); manageLocationError(action, ex);
@ -1023,7 +1023,7 @@ class AsyncProcess {
if (tableName == null) { if (tableName == null) {
// tableName is null when we made a cross-table RPC call. // tableName is null when we made a cross-table RPC call.
hConnection.clearCaches(server); connection.clearCaches(server);
} }
int failed = 0, stopped = 0; int failed = 0, stopped = 0;
List<Action<Row>> toReplay = new ArrayList<Action<Row>>(); List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
@ -1034,7 +1034,7 @@ class AsyncProcess {
// any of the regions in the MultiAction. // any of the regions in the MultiAction.
// TODO: depending on type of exception we might not want to update cache at all? // TODO: depending on type of exception we might not want to update cache at all?
if (tableName != null) { if (tableName != null) {
hConnection.updateCachedLocations(tableName, regionName, row, null, server); connection.updateCachedLocations(tableName, regionName, row, null, server);
} }
for (Action<Row> action : e.getValue()) { for (Action<Row> action : e.getValue()) {
Retry retry = manageError( Retry retry = manageError(
@ -1148,7 +1148,7 @@ class AsyncProcess {
// Register corresponding failures once per server/once per region. // Register corresponding failures once per server/once per region.
if (!regionFailureRegistered) { if (!regionFailureRegistered) {
regionFailureRegistered = true; regionFailureRegistered = true;
hConnection.updateCachedLocations( connection.updateCachedLocations(
tableName, regionName, row.getRow(), result, server); tableName, regionName, row.getRow(), result, server);
} }
if (failureCount == 0) { if (failureCount == 0) {
@ -1197,7 +1197,7 @@ class AsyncProcess {
errorsByServer.reportServerError(server); errorsByServer.reportServerError(server);
canRetry = errorsByServer.canRetryMore(numAttempt); canRetry = errorsByServer.canRetryMore(numAttempt);
} }
hConnection.updateCachedLocations( connection.updateCachedLocations(
tableName, region, actions.get(0).getAction().getRow(), throwable, server); tableName, region, actions.get(0).getAction().getRow(), throwable, server);
failureCount += actions.size(); failureCount += actions.size();
@ -1511,7 +1511,7 @@ class AsyncProcess {
@VisibleForTesting @VisibleForTesting
protected MultiServerCallable<Row> createCallable(final ServerName server, protected MultiServerCallable<Row> createCallable(final ServerName server,
TableName tableName, final MultiAction<Row> multi) { TableName tableName, final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(hConnection, tableName, server, this.rpcFactory, multi); return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
} }
/** /**

View File

@ -245,7 +245,7 @@ class ConnectionManager {
*/ */
@VisibleForTesting @VisibleForTesting
static NonceGenerator injectNonceGeneratorForTesting( static NonceGenerator injectNonceGeneratorForTesting(
HConnection conn, NonceGenerator cnm) { ClusterConnection conn, NonceGenerator cnm) {
HConnectionImplementation connImpl = (HConnectionImplementation)conn; HConnectionImplementation connImpl = (HConnectionImplementation)conn;
NonceGenerator ng = connImpl.getNonceGenerator(); NonceGenerator ng = connImpl.getNonceGenerator();
LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName()); LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());

View File

@ -79,7 +79,7 @@ public class ConnectionUtils {
* @return old nonce generator. * @return old nonce generator.
*/ */
public static NonceGenerator injectNonceGeneratorForTesting( public static NonceGenerator injectNonceGeneratorForTesting(
HConnection conn, NonceGenerator cnm) { ClusterConnection conn, NonceGenerator cnm) {
return ConnectionManager.injectNonceGeneratorForTesting(conn, cnm); return ConnectionManager.injectNonceGeneratorForTesting(conn, cnm);
} }
@ -111,7 +111,7 @@ public class ConnectionUtils {
* @param client the client interface of the local server * @param client the client interface of the local server
* @return an adapted/decorated HConnection * @return an adapted/decorated HConnection
*/ */
public static HConnection createShortCircuitHConnection(final HConnection conn, public static HConnection createShortCircuitHConnection(final Connection conn,
final ServerName serverName, final AdminService.BlockingInterface admin, final ServerName serverName, final AdminService.BlockingInterface admin,
final ClientService.BlockingInterface client) { final ClientService.BlockingInterface client) {
return new ConnectionAdapter(conn) { return new ConnectionAdapter(conn) {

View File

@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
@ -172,8 +171,6 @@ public class HBaseAdmin implements Admin {
private static final String ZK_IDENTIFIER_PREFIX = "hbase-admin-on-"; private static final String ZK_IDENTIFIER_PREFIX = "hbase-admin-on-";
// We use the implementation class rather then the interface because we
// need the package protected functions to get the connection to master
private ClusterConnection connection; private ClusterConnection connection;
private volatile Configuration conf; private volatile Configuration conf;
@ -1447,8 +1444,7 @@ public class HBaseAdmin implements Admin {
* Get all the online regions on a region server. * Get all the online regions on a region server.
*/ */
@Override @Override
public List<HRegionInfo> getOnlineRegions( public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
final ServerName sn) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn); AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
return ProtobufUtil.getOnlineRegions(admin); return ProtobufUtil.getOnlineRegions(admin);
} }
@ -2329,12 +2325,6 @@ public class HBaseAdmin implements Admin {
}); });
} }
private HRegionLocation getFirstMetaServerForTable(final TableName tableName)
throws IOException {
return connection.locateRegion(TableName.META_TABLE_NAME,
HRegionInfo.createRegionName(tableName, null, HConstants.NINES, false));
}
/** /**
* @return Configuration used by the instance. * @return Configuration used by the instance.
*/ */
@ -2491,52 +2481,40 @@ public class HBaseAdmin implements Admin {
/** /**
* Check to see if HBase is running. Throw an exception if not. * Check to see if HBase is running. Throw an exception if not.
* We consider that HBase is running if ZooKeeper and Master are running.
*
* @param conf system configuration * @param conf system configuration
* @throws MasterNotRunningException if the master is not running * @throws MasterNotRunningException if the master is not running
* @throws ZooKeeperConnectionException if unable to connect to zookeeper * @throws ZooKeeperConnectionException if unable to connect to zookeeper
*/ */
// Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
public static void checkHBaseAvailable(Configuration conf) public static void checkHBaseAvailable(Configuration conf)
throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
Configuration copyOfConf = HBaseConfiguration.create(conf); Configuration copyOfConf = HBaseConfiguration.create(conf);
// We set it to make it fail as soon as possible if HBase is not available // We set it to make it fail as soon as possible if HBase is not available
copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
copyOfConf.setInt("zookeeper.recovery.retry", 0); copyOfConf.setInt("zookeeper.recovery.retry", 0);
try (ClusterConnection connection =
ConnectionManager.HConnectionImplementation connection (ClusterConnection)ConnectionFactory.createConnection(copyOfConf)) {
= (ConnectionManager.HConnectionImplementation) // Check ZK first.
HConnectionManager.getConnection(copyOfConf); // If the connection exists, we may have a connection to ZK that does not work anymore
ZooKeeperKeepAliveConnection zkw = null;
try { try {
// Check ZK first. // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
// If the connection exists, we may have a connection to ZK that does zkw = ((ConnectionManager.HConnectionImplementation)connection).
// not work anymore getKeepAliveZooKeeperWatcher();
ZooKeeperKeepAliveConnection zkw = null; zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
try { } catch (IOException e) {
zkw = connection.getKeepAliveZooKeeperWatcher(); throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
zkw.getRecoverableZooKeeper().getZooKeeper().exists( } catch (InterruptedException e) {
zkw.baseZNode, false); throw (InterruptedIOException)
} catch (IOException e) {
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
} catch (InterruptedException e) {
throw (InterruptedIOException)
new InterruptedIOException("Can't connect to ZooKeeper").initCause(e); new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
} finally { } finally {
if (zkw != null) { if (zkw != null) {
zkw.close(); zkw.close();
}
} }
}
// Check Master
connection.isMasterRunning(); connection.isMasterRunning();
} finally {
connection.close();
} }
} }
@ -3706,8 +3684,9 @@ public class HBaseAdmin implements Admin {
@Override @Override
public int getMasterInfoPort() throws IOException { public int getMasterInfoPort() throws IOException {
// TODO: Fix! Reaching into internal implementation!!!!
ConnectionManager.HConnectionImplementation connection = ConnectionManager.HConnectionImplementation connection =
(ConnectionManager.HConnectionImplementation) HConnectionManager.getConnection(conf); (ConnectionManager.HConnectionImplementation)this.connection;
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher(); ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
try { try {
return MasterAddressTracker.getMasterInfoPort(zkw); return MasterAddressTracker.getMasterInfoPort(zkw);

View File

@ -62,6 +62,7 @@ public class MetaCache {
* Search the cache for a location that fits our table and row key. * Search the cache for a location that fits our table and row key.
* Return null if no suitable region is located. * Return null if no suitable region is located.
* *
*
* @param tableName * @param tableName
* @param row * @param row
* @return Null or region location found in cache. * @return Null or region location found in cache.

View File

@ -30,7 +30,7 @@ import java.util.List;
/** /**
* Used to view region location information for a single HBase table. * Used to view region location information for a single HBase table.
* Obtain an instance from an {@link HConnection}. * Obtain an instance from an {@link Connection}.
* *
* @see ConnectionFactory * @see ConnectionFactory
* @see Connection * @see Connection

View File

@ -87,16 +87,19 @@ public class ZKConfig {
Properties zkProperties = new Properties(); Properties zkProperties = new Properties();
// Directly map all of the hbase.zookeeper.property.KEY properties. // Directly map all of the hbase.zookeeper.property.KEY properties.
for (Entry<String, String> entry : new Configuration(conf)) { // copy for mt safety // Synchronize on conf so no loading of configs while we iterate
String key = entry.getKey(); synchronized (conf) {
if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { for (Entry<String, String> entry : conf) {
String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); String key = entry.getKey();
String value = entry.getValue(); if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
// If the value has variables substitutions, need to do a get. String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
if (value.contains(VARIABLE_START)) { String value = entry.getValue();
value = conf.get(key); // If the value has variables substitutions, need to do a get.
if (value.contains(VARIABLE_START)) {
value = conf.get(key);
}
zkProperties.put(zkKey, value);
} }
zkProperties.put(zkKey, value);
} }
} }

View File

@ -25,9 +25,11 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterManager.ServiceType; import org.apache.hadoop.hbase.ClusterManager.ServiceType;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
@ -44,8 +46,8 @@ import com.google.common.collect.Sets;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DistributedHBaseCluster extends HBaseCluster { public class DistributedHBaseCluster extends HBaseCluster {
private Admin admin;
private HBaseAdmin admin; private final Connection connection;
private ClusterManager clusterManager; private ClusterManager clusterManager;
@ -53,7 +55,8 @@ public class DistributedHBaseCluster extends HBaseCluster {
throws IOException { throws IOException {
super(conf); super(conf);
this.clusterManager = clusterManager; this.clusterManager = clusterManager;
this.admin = new HBaseAdmin(conf); this.connection = ConnectionFactory.createConnection(conf);
this.admin = this.connection.getAdmin();
this.initialClusterStatus = getClusterStatus(); this.initialClusterStatus = getClusterStatus();
} }
@ -84,18 +87,21 @@ public class DistributedHBaseCluster extends HBaseCluster {
if (this.admin != null) { if (this.admin != null) {
admin.close(); admin.close();
} }
if (this.connection != null && !this.connection.isClosed()) {
this.connection.close();
}
} }
@Override @Override
public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName) public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
throws IOException { throws IOException {
return admin.getConnection().getAdmin(serverName); return ((ClusterConnection)this.connection).getAdmin(serverName);
} }
@Override @Override
public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName) public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
throws IOException { throws IOException {
return admin.getConnection().getClient(serverName); return ((ClusterConnection)this.connection).getClient(serverName);
} }
@Override @Override
@ -138,8 +144,7 @@ public class DistributedHBaseCluster extends HBaseCluster {
@Override @Override
public MasterService.BlockingInterface getMasterAdminService() public MasterService.BlockingInterface getMasterAdminService()
throws IOException { throws IOException {
HConnection conn = HConnectionManager.getConnection(conf); return ((ClusterConnection)this.connection).getMaster();
return conn.getMaster();
} }
@Override @Override
@ -183,18 +188,19 @@ public class DistributedHBaseCluster extends HBaseCluster {
} }
@Override @Override
public ServerName getServerHoldingRegion(byte[] regionName) throws IOException { public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
HConnection connection = admin.getConnection(); HRegionLocation regionLoc = null;
HRegionLocation regionLoc = connection.locateRegion(regionName); try (RegionLocator locator = connection.getRegionLocator(tn)) {
regionLoc = locator.getRegionLocation(regionName);
}
if (regionLoc == null) { if (regionLoc == null) {
LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) +
+ " for table " + HRegionInfo.getTableName(regionName) + ", start key [" + ", start key [" + Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
return null; return null;
} }
AdminProtos.AdminService.BlockingInterface client = AdminProtos.AdminService.BlockingInterface client =
connection.getAdmin(regionLoc.getServerName()); ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName());
ServerInfo info = ProtobufUtil.getServerInfo(client); ServerInfo info = ProtobufUtil.getServerInfo(client);
return ProtobufUtil.toServerName(info.getServerName()); return ProtobufUtil.toServerName(info.getServerName());
} }
@ -374,7 +380,7 @@ public class DistributedHBaseCluster extends HBaseCluster {
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("While closing the old connection", ioe); LOG.warn("While closing the old connection", ioe);
} }
this.admin = new HBaseAdmin(conf); this.admin = this.connection.getAdmin();
LOG.info("Added new HBaseAdmin"); LOG.info("Added new HBaseAdmin");
return true; return true;
} }

View File

@ -37,4 +37,4 @@ import org.apache.hadoop.mapreduce.Mapper;
public abstract class TableMapper<KEYOUT, VALUEOUT> public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> { extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
} }

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* while the output value <u>must</u> be either a {@link Put} or a * while the output value <u>must</u> be either a {@link Put} or a
* {@link Delete} instance. * {@link Delete} instance.
* *
* <p><KEY> is the type of the key. Ignored in this class.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable

View File

@ -48,9 +48,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan; import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -67,9 +67,9 @@ import org.apache.log4j.Logger;
/** /**
* A tool that is used for manipulating and viewing favored nodes information * A tool that is used for manipulating and viewing favored nodes information
* for regions. Run with -h to get a list of the options * for regions. Run with -h to get a list of the options
*
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
// TODO: Remove? Unused. Partially implemented only.
public class RegionPlacementMaintainer { public class RegionPlacementMaintainer {
private static final Log LOG = LogFactory.getLog(RegionPlacementMaintainer.class private static final Log LOG = LogFactory.getLog(RegionPlacementMaintainer.class
.getName()); .getName());
@ -93,9 +93,9 @@ public class RegionPlacementMaintainer {
private Configuration conf; private Configuration conf;
private final boolean enforceLocality; private final boolean enforceLocality;
private final boolean enforceMinAssignmentMove; private final boolean enforceMinAssignmentMove;
private HBaseAdmin admin;
private RackManager rackManager; private RackManager rackManager;
private Set<TableName> targetTableSet; private Set<TableName> targetTableSet;
private final Connection connection;
public RegionPlacementMaintainer(Configuration conf) { public RegionPlacementMaintainer(Configuration conf) {
this(conf, true, true); this(conf, true, true);
@ -108,7 +108,13 @@ public class RegionPlacementMaintainer {
this.enforceMinAssignmentMove = enforceMinAssignmentMove; this.enforceMinAssignmentMove = enforceMinAssignmentMove;
this.targetTableSet = new HashSet<TableName>(); this.targetTableSet = new HashSet<TableName>();
this.rackManager = new RackManager(conf); this.rackManager = new RackManager(conf);
try {
this.connection = ConnectionFactory.createConnection(this.conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
private static void printHelp(Options opt) { private static void printHelp(Options opt) {
new HelpFormatter().printHelp( new HelpFormatter().printHelp(
"RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " + "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
@ -124,17 +130,6 @@ public class RegionPlacementMaintainer {
} }
} }
/**
* @return the cached HBaseAdmin
* @throws IOException
*/
private HBaseAdmin getHBaseAdmin() throws IOException {
if (this.admin == null) {
this.admin = new HBaseAdmin(this.conf);
}
return this.admin;
}
/** /**
* @return the new RegionAssignmentSnapshot * @return the new RegionAssignmentSnapshot
* @throws IOException * @throws IOException
@ -142,7 +137,7 @@ public class RegionPlacementMaintainer {
public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
throws IOException { throws IOException {
SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot = SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
new SnapshotOfRegionAssignmentFromMeta(HConnectionManager.getConnection(conf)); new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
currentAssignmentShapshot.initialize(); currentAssignmentShapshot.initialize();
return currentAssignmentShapshot; return currentAssignmentShapshot;
} }
@ -210,8 +205,10 @@ public class RegionPlacementMaintainer {
// Get the all the region servers // Get the all the region servers
List<ServerName> servers = new ArrayList<ServerName>(); List<ServerName> servers = new ArrayList<ServerName>();
servers.addAll(getHBaseAdmin().getClusterStatus().getServers()); try (Admin admin = this.connection.getAdmin()) {
servers.addAll(admin.getClusterStatus().getServers());
}
LOG.info("Start to generate assignment plan for " + numRegions + LOG.info("Start to generate assignment plan for " + numRegions +
" regions from table " + tableName + " with " + " regions from table " + tableName + " with " +
servers.size() + " region servers"); servers.size() + " region servers");
@ -619,9 +616,9 @@ public class RegionPlacementMaintainer {
// sort the map based on region info // sort the map based on region info
Map<HRegionInfo, List<ServerName>> assignmentMap = Map<HRegionInfo, List<ServerName>> assignmentMap =
new TreeMap<HRegionInfo, List<ServerName>>(plan.getAssignmentMap()); new TreeMap<HRegionInfo, List<ServerName>>(plan.getAssignmentMap());
for (Map.Entry<HRegionInfo, List<ServerName>> entry : assignmentMap.entrySet()) { for (Map.Entry<HRegionInfo, List<ServerName>> entry : assignmentMap.entrySet()) {
String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue()); String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
String regionName = entry.getKey().getRegionNameAsString(); String regionName = entry.getKey().getRegionNameAsString();
LOG.info("Region: " + regionName ); LOG.info("Region: " + regionName );
@ -660,7 +657,6 @@ public class RegionPlacementMaintainer {
// Get the region to region server map // Get the region to region server map
Map<ServerName, List<HRegionInfo>> currentAssignment = Map<ServerName, List<HRegionInfo>> currentAssignment =
this.getRegionAssignmentSnapshot().getRegionServerToRegionMap(); this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
HConnection connection = this.getHBaseAdmin().getConnection();
// track of the failed and succeeded updates // track of the failed and succeeded updates
int succeededNum = 0; int succeededNum = 0;
@ -691,10 +687,11 @@ public class RegionPlacementMaintainer {
} }
if (singleServerPlan != null) { if (singleServerPlan != null) {
// Update the current region server with its updated favored nodes // Update the current region server with its updated favored nodes
BlockingInterface currentRegionServer = connection.getAdmin(entry.getKey()); BlockingInterface currentRegionServer =
((ClusterConnection)this.connection).getAdmin(entry.getKey());
UpdateFavoredNodesRequest request = UpdateFavoredNodesRequest request =
RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos); RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
UpdateFavoredNodesResponse updateFavoredNodesResponse = UpdateFavoredNodesResponse updateFavoredNodesResponse =
currentRegionServer.updateFavoredNodes(null, request); currentRegionServer.updateFavoredNodes(null, request);
LOG.info("Region server " + LOG.info("Region server " +

View File

@ -36,7 +36,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
@ -46,8 +45,9 @@ import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler; import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
@ -137,7 +137,7 @@ public class ServerManager {
private final Server master; private final Server master;
private final MasterServices services; private final MasterServices services;
private final HConnection connection; private final ClusterConnection connection;
private final DeadServer deadservers = new DeadServer(); private final DeadServer deadservers = new DeadServer();
@ -201,7 +201,7 @@ public class ServerManager {
Configuration c = master.getConfiguration(); Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000); maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
warningSkew = c.getLong("hbase.master.warningclockskew", 10000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
this.connection = connect ? HConnectionManager.getConnection(c) : null; this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
} }
/** /**

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
@ -606,7 +607,7 @@ public class HRegionServer extends HasThread implements
*/ */
protected HConnection createShortCircuitConnection() throws IOException { protected HConnection createShortCircuitConnection() throws IOException {
return ConnectionUtils.createShortCircuitHConnection( return ConnectionUtils.createShortCircuitHConnection(
HConnectionManager.getConnection(conf), serverName, rpcServices, rpcServices); ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices);
} }
/** /**

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
@ -132,9 +131,6 @@ public class SplitLogWorker implements Runnable {
try { try {
LOG.info("SplitLogWorker " + server.getServerName() + " starting"); LOG.info("SplitLogWorker " + server.getServerName() + " starting");
coordination.registerListener(); coordination.registerListener();
// pre-initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf);
// wait for Coordination Engine is ready // wait for Coordination Engine is ready
boolean res = false; boolean res = false;
while (!res && !coordination.isStop()) { while (!res && !coordination.isStop()) {

View File

@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -70,11 +69,12 @@ import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagRewriteCell; import org.apache.hadoop.hbase.TagRewriteCell;
import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
@ -1844,7 +1844,8 @@ public class HLogSplitter {
synchronized (this.tableNameToHConnectionMap) { synchronized (this.tableNameToHConnectionMap) {
hconn = this.tableNameToHConnectionMap.get(tableName); hconn = this.tableNameToHConnectionMap.get(tableName);
if (hconn == null) { if (hconn == null) {
hconn = HConnectionManager.getConnection(conf); // Gets closed over in closeRegionServerWriters
hconn = (HConnection)ConnectionFactory.createConnection(conf);
this.tableNameToHConnectionMap.put(tableName, hconn); this.tableNameToHConnectionMap.put(tableName, hconn);
} }
} }

View File

@ -250,15 +250,18 @@ public abstract class HBaseCluster implements Closeable, Configurable {
* Get the ServerName of region server serving the first hbase:meta region * Get the ServerName of region server serving the first hbase:meta region
*/ */
public ServerName getServerHoldingMeta() throws IOException { public ServerName getServerHoldingMeta() throws IOException {
return getServerHoldingRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); return getServerHoldingRegion(TableName.META_TABLE_NAME,
HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
} }
/** /**
* Get the ServerName of region server serving the specified region * Get the ServerName of region server serving the specified region
* @param regionName Name of the region in bytes * @param regionName Name of the region in bytes
* @param tn Table name that has the region.
* @return ServerName that hosts the region or null * @return ServerName that hosts the region or null
*/ */
public abstract ServerName getServerHoldingRegion(byte[] regionName) throws IOException; public abstract ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
throws IOException;
/** /**
* @return whether we are interacting with a distributed cluster as opposed to an * @return whether we are interacting with a distributed cluster as opposed to an

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@ -517,7 +516,6 @@ public class MiniHBaseCluster extends HBaseCluster {
if (this.hbaseCluster != null) { if (this.hbaseCluster != null) {
this.hbaseCluster.shutdown(); this.hbaseCluster.shutdown();
} }
HConnectionManager.deleteAllConnections(false);
} }
@Override @Override
@ -657,7 +655,8 @@ public class MiniHBaseCluster extends HBaseCluster {
} }
@Override @Override
public ServerName getServerHoldingRegion(byte[] regionName) throws IOException { public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
throws IOException {
// Assume there is only one master thread which is the active master. // Assume there is only one master thread which is the active master.
// If there are multiple master threads, the backup master threads // If there are multiple master threads, the backup master threads
// should hold some regions. Please refer to #countServedRegions // should hold some regions. Please refer to #countServedRegions

View File

@ -906,7 +906,6 @@ public class TestAdmin1 {
admin.createTable(desc, splitKeys); admin.createTable(desc, splitKeys);
HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
Map<HRegionInfo, ServerName> regions = ht.getRegionLocations(); Map<HRegionInfo, ServerName> regions = ht.getRegionLocations();
ht.close();
assertEquals("Tried to create " + expectedRegions + " regions " assertEquals("Tried to create " + expectedRegions + " regions "
+ "but only found " + regions.size(), expectedRegions, regions.size()); + "but only found " + regions.size(), expectedRegions, regions.size());
// Disable table. // Disable table.

View File

@ -51,7 +51,7 @@ public class TestHBaseAdminNoCluster {
* @throws ServiceException * @throws ServiceException
*/ */
@Test @Test
public void testMasterMonitorCollableRetries() public void testMasterMonitorCallableRetries()
throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException { throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException {
Configuration configuration = HBaseConfiguration.create(); Configuration configuration = HBaseConfiguration.create();
// Set the pause and retry count way down. // Set the pause and retry count way down.
@ -60,20 +60,18 @@ public class TestHBaseAdminNoCluster {
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count); configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count);
// Get mocked connection. Getting the connection will register it so when HBaseAdmin is // Get mocked connection. Getting the connection will register it so when HBaseAdmin is
// constructed with same configuration, it will find this mocked connection. // constructed with same configuration, it will find this mocked connection.
HConnection connection = HConnectionTestingUtility.getMockedConnection(configuration); ClusterConnection connection = HConnectionTestingUtility.getMockedConnection(configuration);
// Mock so we get back the master interface. Make it so when createTable is called, we throw // Mock so we get back the master interface. Make it so when createTable is called, we throw
// the PleaseHoldException. // the PleaseHoldException.
MasterKeepAliveConnection masterAdmin = MasterKeepAliveConnection masterAdmin = Mockito.mock(MasterKeepAliveConnection.class);
Mockito.mock(MasterKeepAliveConnection.class);
Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(), Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(),
(CreateTableRequest)Mockito.any())). (CreateTableRequest)Mockito.any())).
thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test"))); thenThrow(new ServiceException("Test fail").initCause(new PleaseHoldException("test")));
Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin); Mockito.when(connection.getKeepAliveMasterService()).thenReturn(masterAdmin);
// Mock up our admin Interfaces Admin admin = new HBaseAdmin(connection);
Admin admin = new HBaseAdmin(configuration);
try { try {
HTableDescriptor htd = HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("testMasterMonitorCollableRetries")); new HTableDescriptor(TableName.valueOf("testMasterMonitorCollableRetries"));
// Pass any old htable descriptor; not important // Pass any old htable descriptor; not important
try { try {
admin.createTable(htd, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); admin.createTable(htd, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
@ -86,7 +84,7 @@ public class TestHBaseAdminNoCluster {
(CreateTableRequest)Mockito.any()); (CreateTableRequest)Mockito.any());
} finally { } finally {
admin.close(); admin.close();
if (connection != null)HConnectionManager.deleteConnection(configuration); if (connection != null) connection.close();
} }
} }
} }

View File

@ -131,6 +131,8 @@ public class TestHCM {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
if (isJavaOk) { if (isJavaOk) {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
} }
@ -1009,113 +1011,114 @@ public class TestHCM {
@Test @Test
public void testMulti() throws Exception { public void testMulti() throws Exception {
HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM); HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
TEST_UTIL.createMultiRegions(table, FAM_NAM); try {
ConnectionManager.HConnectionImplementation conn = TEST_UTIL.createMultiRegions(table, FAM_NAM);
(ConnectionManager.HConnectionImplementation) ConnectionManager.HConnectionImplementation conn =
HConnectionManager.getConnection(TEST_UTIL.getConfiguration()); ( ConnectionManager.HConnectionImplementation)table.getConnection();
// We're now going to move the region and check that it works for the client // We're now going to move the region and check that it works for the client
// First a new put to add the location in the cache // First a new put to add the location in the cache
conn.clearRegionCache(TABLE_NAME3); conn.clearRegionCache(TABLE_NAME3);
Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
// We can wait for all regions to be online, that makes log reading easier when debugging // We can wait for all regions to be online, that makes log reading easier when debugging
while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
Thread.sleep(1); Thread.sleep(1);
}
Put put = new Put(ROW_X);
put.add(FAM_NAM, ROW_X, ROW_X);
table.put(put);
// Now moving the region to the second server
HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
byte[] regionName = toMove.getRegionInfo().getRegionName();
byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
// Choose the other server.
int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
int destServerId = (curServerId == 0 ? 1 : 0);
HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
ServerName destServerName = destServer.getServerName();
//find another row in the cur server that is less than ROW_X
List<HRegion> regions = curServer.getOnlineRegions(TABLE_NAME3);
byte[] otherRow = null;
for (HRegion region : regions) {
if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
&& Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
otherRow = region.getRegionInfo().getStartKey();
break;
} }
}
assertNotNull(otherRow);
// If empty row, set it to first row.-f
if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
Put put2 = new Put(otherRow);
put2.add(FAM_NAM, otherRow, otherRow);
table.put(put2); //cache put2's location
// Check that we are in the expected state Put put = new Put(ROW_X);
Assert.assertTrue(curServer != destServer); put.add(FAM_NAM, ROW_X, ROW_X);
Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); table.put(put);
Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
Assert.assertNotNull(curServer.getOnlineRegion(regionName));
Assert.assertNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionStates().isRegionsInTransition());
// Moving. It's possible that we don't have all the regions online at this point, so // Now moving the region to the second server
// the test must depends only on the region we're looking at. HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); byte[] regionName = toMove.getRegionInfo().getRegionName();
TEST_UTIL.getHBaseAdmin().move( byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
toMove.getRegionInfo().getEncodedNameAsBytes(),
destServerName.getServerName().getBytes()
);
while (destServer.getOnlineRegion(regionName) == null || // Choose the other server.
destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || int destServerId = (curServerId == 0 ? 1 : 0);
master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
// wait for the move to be finished
Thread.sleep(1);
}
LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
// Check our new state. ServerName destServerName = destServer.getServerName();
Assert.assertNull(curServer.getOnlineRegion(regionName));
Assert.assertNotNull(destServer.getOnlineRegion(regionName)); //find another row in the cur server that is less than ROW_X
Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); List<HRegion> regions = curServer.getOnlineRegions(TABLE_NAME3);
Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); byte[] otherRow = null;
for (HRegion region : regions) {
if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
&& Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
otherRow = region.getRegionInfo().getStartKey();
break;
}
}
assertNotNull(otherRow);
// If empty row, set it to first row.-f
if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
Put put2 = new Put(otherRow);
put2.add(FAM_NAM, otherRow, otherRow);
table.put(put2); //cache put2's location
// Check that we are in the expected state
Assert.assertTrue(curServer != destServer);
Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
Assert.assertNotNull(curServer.getOnlineRegion(regionName));
Assert.assertNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionStates().isRegionsInTransition());
// Moving. It's possible that we don't have all the regions online at this point, so
// the test must depends only on the region we're looking at.
LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
TEST_UTIL.getHBaseAdmin().move(
toMove.getRegionInfo().getEncodedNameAsBytes(),
destServerName.getServerName().getBytes()
);
while (destServer.getOnlineRegion(regionName) == null ||
destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
// wait for the move to be finished
Thread.sleep(1);
}
LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
// Check our new state.
Assert.assertNull(curServer.getOnlineRegion(regionName));
Assert.assertNotNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
// Cache was NOT updated and points to the wrong server // Cache was NOT updated and points to the wrong server
Assert.assertFalse( Assert.assertFalse(
conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation() conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
.getPort() == destServerName.getPort()); .getPort() == destServerName.getPort());
// Hijack the number of retry to fail after 2 tries // Hijack the number of retry to fail after 2 tries
final int prevNumRetriesVal = setNumTries(conn, 2); final int prevNumRetriesVal = setNumTries(conn, 2);
Put put3 = new Put(ROW_X); Put put3 = new Put(ROW_X);
put3.add(FAM_NAM, ROW_X, ROW_X); put3.add(FAM_NAM, ROW_X, ROW_X);
Put put4 = new Put(otherRow); Put put4 = new Put(otherRow);
put4.add(FAM_NAM, otherRow, otherRow); put4.add(FAM_NAM, otherRow, otherRow);
// do multi // do multi
table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row, table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
// second we get RegionMovedException. // second we get RegionMovedException.
setNumTries(conn, prevNumRetriesVal); setNumTries(conn, prevNumRetriesVal);
table.close(); } finally {
conn.close(); table.close();
}
} }
@Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test @Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test

View File

@ -33,7 +33,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -43,12 +42,10 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.log4j.Level;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -70,6 +67,7 @@ public class TestMultiParallel {
private static final byte [][] KEYS = makeKeys(); private static final byte [][] KEYS = makeKeys();
private static final int slaves = 5; // also used for testing HTable pool size private static final int slaves = 5; // also used for testing HTable pool size
private static Connection CONNECTION;
@BeforeClass public static void beforeClass() throws Exception { @BeforeClass public static void beforeClass() throws Exception {
// Uncomment the following lines if more verbosity is needed for // Uncomment the following lines if more verbosity is needed for
@ -82,9 +80,11 @@ public class TestMultiParallel {
UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY)); UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
UTIL.waitTableEnabled(TEST_TABLE); UTIL.waitTableEnabled(TEST_TABLE);
t.close(); t.close();
CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
} }
@AfterClass public static void afterClass() throws Exception { @AfterClass public static void afterClass() throws Exception {
CONNECTION.close();
UTIL.shutdownMiniCluster(); UTIL.shutdownMiniCluster();
} }
@ -97,9 +97,6 @@ public class TestMultiParallel {
// Wait until completing balance // Wait until completing balance
UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
} }
HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration());
conn.clearRegionCache();
conn.close();
LOG.info("before done"); LOG.info("before done");
} }
@ -328,7 +325,7 @@ public class TestMultiParallel {
@Test (timeout=300000) @Test (timeout=300000)
public void testBatchWithPut() throws Exception { public void testBatchWithPut() throws Exception {
LOG.info("test=testBatchWithPut"); LOG.info("test=testBatchWithPut");
Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); Table table = CONNECTION.getTable(TEST_TABLE);
// put multiple rows using a batch // put multiple rows using a batch
List<Row> puts = constructPutRequests(); List<Row> puts = constructPutRequests();
@ -347,9 +344,8 @@ public class TestMultiParallel {
results = table.batch(puts); results = table.batch(puts);
} catch (RetriesExhaustedWithDetailsException ree) { } catch (RetriesExhaustedWithDetailsException ree) {
LOG.info(ree.getExhaustiveDescription()); LOG.info(ree.getExhaustiveDescription());
throw ree;
} finally {
table.close(); table.close();
throw ree;
} }
validateSizeAndEmpty(results, KEYS.length); validateSizeAndEmpty(results, KEYS.length);
} }
@ -490,7 +486,8 @@ public class TestMultiParallel {
@Test(timeout=300000) @Test(timeout=300000)
public void testNonceCollision() throws Exception { public void testNonceCollision() throws Exception {
LOG.info("test=testNonceCollision"); LOG.info("test=testNonceCollision");
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
Table table = connection.getTable(TEST_TABLE);
Put put = new Put(ONE_ROW); Put put = new Put(ONE_ROW);
put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L)); put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
@ -509,8 +506,9 @@ public class TestMultiParallel {
return nonce; return nonce;
} }
}; };
NonceGenerator oldCnm = NonceGenerator oldCnm =
ConnectionUtils.injectNonceGeneratorForTesting(table.getConnection(), cnm); ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm);
// First test sequential requests. // First test sequential requests.
try { try {
@ -540,7 +538,7 @@ public class TestMultiParallel {
public void run() { public void run() {
Table table = null; Table table = null;
try { try {
table = new HTable(UTIL.getConfiguration(), TEST_TABLE); table = connection.getTable(TEST_TABLE);
} catch (IOException e) { } catch (IOException e) {
fail("Not expected"); fail("Not expected");
} }
@ -573,7 +571,7 @@ public class TestMultiParallel {
validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L)); validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
table.close(); table.close();
} finally { } finally {
ConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), oldCnm); ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm);
} }
} }

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
@ -231,10 +233,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
* @throws IOException * @throws IOException
*/ */
void assertExpectedTable(TableName table, int count, int value) throws IOException { void assertExpectedTable(TableName table, int count, int value) throws IOException {
HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
assertEquals(htds.length, 1);
Table t = null; Table t = null;
try { try {
assertEquals(
util.getHBaseAdmin().listTables(table.getNameAsString()).length, 1);
t = new HTable(util.getConfiguration(), table); t = new HTable(util.getConfiguration(), table);
Scan s = new Scan(); Scan s = new Scan();
ResultScanner sr = t.getScanner(s); ResultScanner sr = t.getScanner(s);
@ -443,30 +445,33 @@ public class TestLoadIncrementalHFilesSplitRecovery {
public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta"); TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta");
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
HTable table = new HTable(util.getConfiguration(), tableName); // Share connection. We were failing to find the table with our new reverse scan because it
// looks for first region, not any region -- that is how it works now. The below removes first
// region in test. Was reliant on the Connection caching having first region.
Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
Table table = connection.getTable(tableName);
setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
Path dir = buildBulkFiles(tableName, 2); Path dir = buildBulkFiles(tableName, 2);
final AtomicInteger countedLqis = new AtomicInteger(); final AtomicInteger countedLqis = new AtomicInteger();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles( LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
util.getConfiguration()) {
protected List<LoadQueueItem> groupOrSplit( protected List<LoadQueueItem> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable htable, final LoadQueueItem item, final HTable htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
if (lqis != null) { if (lqis != null) {
countedLqis.addAndGet(lqis.size()); countedLqis.addAndGet(lqis.size());
}
return lqis;
} }
return lqis; };
}
};
// do bulkload when there is no region hole in hbase:meta. // do bulkload when there is no region hole in hbase:meta.
try { try {
loader.doBulkLoad(dir, table); loader.doBulkLoad(dir, (HTable)table);
} catch (Exception e) { } catch (Exception e) {
LOG.error("exeception=", e); LOG.error("exeception=", e);
} }
@ -481,13 +486,13 @@ public class TestLoadIncrementalHFilesSplitRecovery {
util.getZooKeeperWatcher(), hConnection, tableName); util.getZooKeeperWatcher(), hConnection, tableName);
for (HRegionInfo regionInfo : regionInfos) { for (HRegionInfo regionInfo : regionInfos) {
if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
MetaTableAccessor.deleteRegion(hConnection, regionInfo); MetaTableAccessor.deleteRegion(connection, regionInfo);
break; break;
} }
} }
try { try {
loader.doBulkLoad(dir, table); loader.doBulkLoad(dir, (HTable)table);
} catch (Exception e) { } catch (Exception e) {
LOG.error("exeception=", e); LOG.error("exeception=", e);
assertTrue("IOException expected", e instanceof IOException); assertTrue("IOException expected", e instanceof IOException);
@ -495,7 +500,43 @@ public class TestLoadIncrementalHFilesSplitRecovery {
table.close(); table.close();
this.assertExpectedTable(tableName, ROWCOUNT, 2); // Make sure at least the one region that still exists can be found.
regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
connection, tableName);
assertTrue(regionInfos.size() >= 1);
this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
connection.close();
}
/**
* Checks that all columns have the expected value and that there is the
* expected number of rows.
* @throws IOException
*/
void assertExpectedTable(final Connection connection, TableName table, int count, int value)
throws IOException {
HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
assertEquals(htds.length, 1);
Table t = null;
try {
t = connection.getTable(table);
Scan s = new Scan();
ResultScanner sr = t.getScanner(s);
int i = 0;
for (Result r : sr) {
i++;
for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
for (byte[] val : nm.values()) {
assertTrue(Bytes.equals(val, value(value)));
}
}
}
assertEquals(count, i);
} catch (IOException e) {
fail("Failed due to exception");
} finally {
if (t != null) t.close();
}
} }
} }

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -103,8 +104,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -332,7 +332,7 @@ public class TestDistributedLogSplitting {
HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE); HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
NonceGeneratorWithDups ng = new NonceGeneratorWithDups(); NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
NonceGenerator oldNg = NonceGenerator oldNg =
ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), ng); ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)ht.getConnection(), ng);
try { try {
List<Increment> reqs = new ArrayList<Increment>(); List<Increment> reqs = new ArrayList<Increment>();
@ -366,7 +366,7 @@ public class TestDistributedLogSplitting {
} }
} }
} finally { } finally {
ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), oldNg); ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) ht.getConnection(), oldNg);
ht.close(); ht.close();
zkw.close(); zkw.close();
} }

View File

@ -177,7 +177,6 @@ public class TestScannerWithBulkload {
table.put(put1); table.put(put1);
table.flushCommits(); table.flushCommits();
admin.flush(tableName); admin.flush(tableName);
admin.close();
put0 = new Put(Bytes.toBytes("row1")); put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version1"))); .toBytes("version1")));
@ -198,7 +197,7 @@ public class TestScannerWithBulkload {
@Test @Test
public void testBulkLoadWithParallelScan() throws Exception { public void testBulkLoadWithParallelScan() throws Exception {
TableName tableName = TableName.valueOf("testBulkLoadWithParallelScan"); TableName tableName = TableName.valueOf("testBulkLoadWithParallelScan");
final long l = System.currentTimeMillis(); final long l = System.currentTimeMillis();
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
createTable(admin, tableName); createTable(admin, tableName);
Scan scan = createScan(); Scan scan = createScan();

View File

@ -107,8 +107,8 @@ import org.junit.experimental.categories.Category;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
/** /**
* Like {@link TestSplitTransaction} in that we're testing {@link SplitTransaction} * Like TestSplitTransaction in that we're testing {@link SplitTransaction}
* only the below tests are against a running cluster where {@link TestSplitTransaction} * only the below tests are against a running cluster where TestSplitTransaction
* is tests against a bare {@link HRegion}. * is tests against a bare {@link HRegion}.
*/ */
@Category(LargeTests.class) @Category(LargeTests.class)
@ -1053,7 +1053,7 @@ public class TestSplitTransactionOnCluster {
fail("Each table should have at least one region."); fail("Each table should have at least one region.");
} }
ServerName serverName = ServerName serverName =
cluster.getServerHoldingRegion(firstTableRegions.get(0).getRegionName()); cluster.getServerHoldingRegion(firstTable, firstTableRegions.get(0).getRegionName());
admin.move(secondTableRegions.get(0).getRegionInfo().getEncodedNameAsBytes(), admin.move(secondTableRegions.get(0).getRegionInfo().getEncodedNameAsBytes(),
Bytes.toBytes(serverName.getServerName())); Bytes.toBytes(serverName.getServerName()));
Table table1 = null; Table table1 = null;

View File

@ -85,7 +85,7 @@ public class TestAccessController2 extends SecureTestUtil {
public Object run() throws Exception { public Object run() throws Exception {
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE.getTableName()); HTableDescriptor desc = new HTableDescriptor(TEST_TABLE.getTableName());
desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
Admin admin = new HBaseAdmin(conf); Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
try { try {
admin.createTable(desc); admin.createTable(desc);
} finally { } finally {