HBASE-15610 Remove deprecated HConnection for 2.0 thus removing all PB references for 2.0

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Jurriaan Mous 2016-05-25 22:53:28 +02:00 committed by stack
parent 74442fde0f
commit cdd532da8a
62 changed files with 429 additions and 1387 deletions

View File

@ -277,7 +277,7 @@ class AsyncProcess {
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
RpcControllerFactory rpcFactory) {
if (hc == null) {
throw new IllegalArgumentException("HConnection cannot be null.");
throw new IllegalArgumentException("ClusterConnection cannot be null.");
}
this.connection = hc;

View File

@ -40,13 +40,18 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
@InterfaceAudience.Private
// NOTE: Although this class is public, this class is meant to be used directly from internal
// classes and unit tests only.
public interface ClusterConnection extends HConnection {
public interface ClusterConnection extends Connection {
/**
* Key for configuration in Configuration whose value is the class we implement making a
* new Connection instance.
*/
String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl";
/**
* @return - true if the master server is running
* @deprecated this has been deprecated without a replacement
*/
@Override
@Deprecated
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
@ -63,46 +68,64 @@ public interface ClusterConnection extends HConnection {
* @throws IOException
* if a remote or network exception occurs
*/
@Override
boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws
IOException;
/**
* A table that isTableEnabled == false and isTableDisabled == false
* is possible. This happens when a table has a lot of regions
* that must be processed.
* @param tableName table name
* @return true if the table is enabled, false otherwise
* @throws IOException if a remote or network exception occurs
*/
boolean isTableEnabled(TableName tableName) throws IOException;
/**
* @param tableName table name
* @return true if the table is disabled, false otherwise
* @throws IOException if a remote or network exception occurs
*/
boolean isTableDisabled(TableName tableName) throws IOException;
/**
* Retrieve TableState, represent current table state.
* @param tableName table state for
* @return state of the table
*/
TableState getTableState(TableName tableName) throws IOException;
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in
* question
* question
* @throws IOException if a remote or network exception occurs
*/
@Override
public HRegionLocation locateRegion(final TableName tableName,
HRegionLocation locateRegion(final TableName tableName,
final byte [] row) throws IOException;
/**
* Allows flushing the region cache.
*/
@Override
void clearRegionCache();
void cacheLocation(final TableName tableName, final RegionLocations location);
/**
* Allows flushing the region cache of all locations that pertain to
* <code>tableName</code>
* @param tableName Name of the table whose regions we are to remove from
* cache.
* cache.
*/
@Override
void clearRegionCache(final TableName tableName);
/**
* Deletes cached locations for the specific region.
* @param location The location object for the region, to be purged from cache.
*/
@Override
void deleteCachedRegionLocation(final HRegionLocation location);
/**
@ -111,10 +134,9 @@ public interface ClusterConnection extends HConnection {
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in
* question
* question
* @throws IOException if a remote or network exception occurs
*/
@Override
HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException;
@ -125,7 +147,7 @@ public interface ClusterConnection extends HConnection {
* @param row row key you're trying to find the region of
* @param replicaId the replicaId of the region
* @return RegionLocations that describe where to find the region in
* question
* question
* @throws IOException if a remote or network exception occurs
*/
RegionLocations relocateRegion(final TableName tableName,
@ -140,19 +162,16 @@ public interface ClusterConnection extends HConnection {
* @param exception the exception if any. Can be null.
* @param source the previous location
*/
@Override
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey,
Object exception, ServerName source);
/**
* Gets the location of the region of <i>regionName</i>.
* @param regionName name of the region to locate
* @return HRegionLocation that describes where to find the region in
* question
* question
* @throws IOException if a remote or network exception occurs
*/
@Override
HRegionLocation locateRegion(final byte[] regionName)
throws IOException;
@ -160,9 +179,8 @@ public interface ClusterConnection extends HConnection {
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @return list of region locations for all regions of table
* @throws IOException
* @throws IOException if IO failure occurs
*/
@Override
List<HRegionLocation> locateRegions(final TableName tableName) throws IOException;
/**
@ -172,9 +190,8 @@ public interface ClusterConnection extends HConnection {
* @param offlined True if we are to include offlined regions, false and we'll leave out offlined
* regions from returned list.
* @return list of region locations for all regions of table
* @throws IOException
* @throws IOException if IO failure occurs
*/
@Override
List<HRegionLocation> locateRegions(final TableName tableName,
final boolean useCache,
final boolean offlined) throws IOException;
@ -186,12 +203,12 @@ public interface ClusterConnection extends HConnection {
* @param useCache Should we use the cache to retrieve the region information.
* @param retry do we retry
* @return region locations for this row.
* @throws IOException
* @throws IOException if IO failure occurs
*/
RegionLocations locateRegion(TableName tableName,
byte[] row, boolean useCache, boolean retry) throws IOException;
/**
/**
*
* @param tableName table to get regions of
* @param row the row
@ -199,15 +216,14 @@ public interface ClusterConnection extends HConnection {
* @param retry do we retry
* @param replicaId the replicaId for the region
* @return region locations for this row.
* @throws IOException
* @throws IOException if IO failure occurs
*/
RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry,
RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry,
int replicaId) throws IOException;
/**
* Returns a {@link MasterKeepAliveConnection} to the active master
*/
@Override
MasterService.BlockingInterface getMaster() throws IOException;
@ -217,7 +233,6 @@ public interface ClusterConnection extends HConnection {
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*/
@Override
AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException;
/**
@ -229,7 +244,6 @@ public interface ClusterConnection extends HConnection {
* @throws IOException if a remote or network exception occurs
*
*/
@Override
ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException;
/**
@ -240,7 +254,6 @@ public interface ClusterConnection extends HConnection {
* @return Location of row.
* @throws IOException if a remote or network exception occurs
*/
@Override
HRegionLocation getRegionLocation(TableName tableName, byte [] row,
boolean reload)
throws IOException;
@ -249,34 +262,30 @@ public interface ClusterConnection extends HConnection {
* Clear any caches that pertain to server name <code>sn</code>.
* @param sn A server name
*/
@Override
void clearCaches(final ServerName sn);
/**
* This function allows HBaseAdmin and potentially others to get a shared MasterService
* connection.
* @return The shared instance. Never returns null.
* @throws MasterNotRunningException
* @throws MasterNotRunningException if master is not running
* @deprecated Since 0.96.0
*/
@Override
@Deprecated
MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException;
/**
* @param serverName
* @param serverName of server to check
* @return true if the server is known as dead, false otherwise.
* @deprecated internal method, do not use thru HConnection */
@Override
* @deprecated internal method, do not use thru ClusterConnection */
@Deprecated
boolean isDeadServer(ServerName serverName);
/**
* @return Nonce generator for this HConnection; may be null if disabled in configuration.
* @return Nonce generator for this ClusterConnection; may be null if disabled in configuration.
*/
@Override
public NonceGenerator getNonceGenerator();
NonceGenerator getNonceGenerator();
/**
* @return Default AsyncProcess associated with this connection.
@ -287,7 +296,7 @@ public interface ClusterConnection extends HConnection {
* Returns a new RpcRetryingCallerFactory from the given {@link Configuration}.
* This RpcRetryingCallerFactory lets the users create {@link RpcRetryingCaller}s which can be
* intercepted with the configured {@link RetryingCallerInterceptor}
* @param conf
* @param conf configuration
* @return RpcRetryingCallerFactory
*/
RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf);
@ -320,11 +329,17 @@ public interface ClusterConnection extends HConnection {
/**
* @return the MetricsConnection instance associated with this connection.
*/
public MetricsConnection getConnectionMetrics();
MetricsConnection getConnectionMetrics();
/**
* @return true when this connection uses a {@link org.apache.hadoop.hbase.codec.Codec} and so
* supports cell blocks.
*/
boolean hasCellBlockSupport();
/**
* @return the number of region servers that are currently running
* @throws IOException if a remote or network exception occurs
*/
int getCurrentNrHRS() throws IOException;
}

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
* thread will obtain its own Table instance. Caching or pooling of {@link Table} and {@link Admin}
* is not recommended.
*
* <p>This class replaces {@link HConnection}, which is now deprecated.
* @see ConnectionFactory
* @since 0.99.0
*/
@ -59,7 +58,7 @@ public interface Connection extends Abortable, Closeable {
* - Only allow new style of interfaces:
* -- All table names are passed as TableName. No more byte[] and string arguments
* -- Most of the classes with names H is deprecated in favor of non-H versions
* (Table, Connection vs HConnection, etc)
* (Table, Connection, etc)
* -- Only real client-facing public methods are allowed
* - Connection should contain only getTable(), getAdmin() kind of general methods.
*/
@ -123,7 +122,7 @@ public interface Connection extends Abortable, Closeable {
*
* @return a {@link BufferedMutator} for the supplied tableName.
*/
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException;
BufferedMutator getBufferedMutator(TableName tableName) throws IOException;
/**
* Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
@ -134,7 +133,7 @@ public interface Connection extends Abortable, Closeable {
* @param params details on how to instantiate the {@code BufferedMutator}.
* @return a {@link BufferedMutator} for the supplied tableName.
*/
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException;
BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException;
/**
* Retrieve a RegionLocator implementation to inspect region information on a table. The returned
@ -151,7 +150,7 @@ public interface Connection extends Abortable, Closeable {
* @param tableName Name of the table who's region is to be examined
* @return A RegionLocator instance
*/
public RegionLocator getRegionLocator(TableName tableName) throws IOException;
RegionLocator getRegionLocator(TableName tableName) throws IOException;
/**
* Retrieve an Admin implementation to administer an HBase cluster.
@ -167,7 +166,7 @@ public interface Connection extends Abortable, Closeable {
Admin getAdmin() throws IOException;
@Override
public void close() throws IOException;
void close() throws IOException;
/**
* Returns whether the connection is closed or not.

View File

@ -214,7 +214,7 @@ public class ConnectionFactory {
user = provider.getCurrent();
}
String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionImplementation.class.getName());
Class<?> clazz;
try {

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
@ -64,13 +63,11 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -98,7 +95,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
/**
* Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
* Encapsulates connection to zookeeper and regionservers.
@ -124,8 +120,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
*/
private static volatile NonceGenerator nonceGenerator = null;
/** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
private static Object nonceGeneratorCreateLock = new Object();
/** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */
private static final Object nonceGeneratorCreateLock = new Object();
private final AsyncProcess asyncProcess;
// single tracker per connection
@ -137,7 +133,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// package protected for the tests
ClusterStatusListener clusterStatusListener;
private final Object metaRegionLock = new Object();
// We have a single lock for master & zk to prevent deadlocks. Having
@ -162,23 +157,23 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
private final ConnectionConfiguration connectionConfig;
// Client rpc instance.
private RpcClient rpcClient;
private final RpcClient rpcClient;
private final MetaCache metaCache;
private final MetricsConnection metrics;
protected User user;
private RpcRetryingCallerFactory rpcCallerFactory;
private final RpcRetryingCallerFactory rpcCallerFactory;
private RpcControllerFactory rpcControllerFactory;
private final RpcControllerFactory rpcControllerFactory;
private final RetryingCallerInterceptor interceptor;
/**
* Cluster registry of basic info such as clusterid and meta region location.
*/
Registry registry;
Registry registry;
private final ClientBackoffPolicy backoffPolicy;
@ -279,32 +274,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return ng;
}
@Override
public HTableInterface getTable(String tableName) throws IOException {
return getTable(TableName.valueOf(tableName));
}
@Override
public HTableInterface getTable(byte[] tableName) throws IOException {
return getTable(TableName.valueOf(tableName));
}
@Override
public HTableInterface getTable(TableName tableName) throws IOException {
return getTable(tableName, getBatchPool());
}
@Override
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
return getTable(TableName.valueOf(tableName), pool);
}
@Override
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
return getTable(TableName.valueOf(tableName), pool);
}
@Override
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
return new HTable(tableName, this, connectionConfig,
@ -463,7 +437,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
protected String clusterId = null;
protected void retrieveClusterId() {
if (clusterId != null) return;
if (clusterId != null) {
return;
}
this.clusterId = this.registry.getClusterId();
if (clusterId == null) {
clusterId = HConstants.CLUSTER_ID_DEFAULT;
@ -519,47 +495,23 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
}
@Override
public HRegionLocation getRegionLocation(final byte[] tableName,
final byte [] row, boolean reload)
throws IOException {
return getRegionLocation(TableName.valueOf(tableName), row, reload);
}
@Override
public boolean isTableEnabled(TableName tableName) throws IOException {
return getTableState(tableName).inStates(TableState.State.ENABLED);
}
@Override
public boolean isTableEnabled(byte[] tableName) throws IOException {
return isTableEnabled(TableName.valueOf(tableName));
}
@Override
public boolean isTableDisabled(TableName tableName) throws IOException {
return getTableState(tableName).inStates(TableState.State.DISABLED);
}
@Override
public boolean isTableDisabled(byte[] tableName) throws IOException {
return isTableDisabled(TableName.valueOf(tableName));
}
@Override
public boolean isTableAvailable(final TableName tableName) throws IOException {
return isTableAvailable(tableName, null);
}
@Override
public boolean isTableAvailable(final byte[] tableName) throws IOException {
return isTableAvailable(TableName.valueOf(tableName));
}
@Override
public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
throws IOException {
if (this.closed) throw new IOException(toString() + " closed");
if (this.closed) {
throw new IOException(toString() + " closed");
}
try {
if (!isTableEnabled(tableName)) {
LOG.debug("Table " + tableName + " not enabled");
@ -615,12 +567,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
}
@Override
public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
throws IOException {
return isTableAvailable(TableName.valueOf(tableName), splitKeys);
}
@Override
public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
@ -643,12 +589,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return locateRegions(tableName, false, true);
}
@Override
public List<HRegionLocation> locateRegions(final byte[] tableName)
throws IOException {
return locateRegions(TableName.valueOf(tableName));
}
@Override
public List<HRegionLocation> locateRegions(final TableName tableName,
final boolean useCache, final boolean offlined) throws IOException {
@ -668,12 +608,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return locations;
}
@Override
public List<HRegionLocation> locateRegions(final byte[] tableName,
final boolean useCache, final boolean offlined) throws IOException {
return locateRegions(TableName.valueOf(tableName), useCache, offlined);
}
@Override
public HRegionLocation locateRegion(
final TableName tableName, final byte[] row) throws IOException{
@ -681,13 +615,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return locations == null ? null : locations.getRegionLocation();
}
@Override
public HRegionLocation locateRegion(final byte[] tableName,
final byte [] row)
throws IOException{
return locateRegion(TableName.valueOf(tableName), row);
}
@Override
public HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException{
@ -710,12 +637,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return locateRegion(tableName, row, false, true, replicaId);
}
@Override
public HRegionLocation relocateRegion(final byte[] tableName,
final byte [] row) throws IOException {
return relocateRegion(TableName.valueOf(tableName), row);
}
@Override
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry)
@ -727,7 +648,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry, int replicaId)
throws IOException {
if (this.closed) throw new IOException(toString() + " closed");
if (this.closed) {
throw new IOException(toString() + " closed");
}
if (tableName== null || tableName.getName().length == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
@ -966,11 +889,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
metaCache.clearCache(tableName);
}
@Override
public void clearRegionCache(final byte[] tableName) {
clearRegionCache(TableName.valueOf(tableName));
}
/**
* Put a newly discovered HRegionLocation into the cache.
* @param tableName The table name.
@ -993,11 +911,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* State of the MasterService connection/setup.
*/
static class MasterServiceState {
HConnection connection;
Connection connection;
MasterProtos.MasterService.BlockingInterface stub;
int userCount;
MasterServiceState(final HConnection connection) {
MasterServiceState(final Connection connection) {
super();
this.connection = connection;
}
@ -1189,7 +1107,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
/**
* Create a stub against the master. Retry if necessary.
* @return A stub to do <code>intf</code> against the master
* @throws org.apache.hadoop.hbase.MasterNotRunningException
* @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
*/
Object makeStub() throws IOException {
// The lock must be at the beginning to prevent multiple master creations
@ -1245,26 +1163,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
@Override
public AdminProtos.AdminService.BlockingInterface getAdmin(final ServerName serverName)
throws IOException {
return getAdmin(serverName, false);
}
@Override
// Nothing is done w/ the 'master' parameter. It is ignored.
public AdminProtos.AdminService.BlockingInterface getAdmin(final ServerName serverName,
final boolean master)
throws IOException {
if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead.");
}
String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(),
serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
this.connectionLock.putIfAbsent(key, key);
AdminProtos.AdminService.BlockingInterface stub = null;
AdminProtos.AdminService.BlockingInterface stub;
synchronized (this.connectionLock.get(key)) {
stub = (AdminProtos.AdminService.BlockingInterface)this.stubs.get(key);
if (stub == null) {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
stub = AdminProtos.AdminService.newBlockingStub(channel);
this.stubs.put(key, stub);
}
@ -1798,7 +1708,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
void releaseMaster(MasterServiceState mss) {
if (mss.getStub() == null) return;
if (mss.getStub() == null) {
return;
}
synchronized (masterAndZKLock) {
--mss.userCount;
}
@ -1833,20 +1745,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
metaCache.clearCache(location);
}
@Override
public void updateCachedLocations(final TableName tableName, byte[] rowkey,
final Object exception, final HRegionLocation source) {
assert source != null;
updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
, rowkey, exception, source.getServerName());
}
/**
* Update the location with the new value (if the exception is a RegionMovedException)
* or delete it from the cache. Does nothing if we can be sure from the exception that
* the location is still accurate, or if the cache has already been updated.
* @param exception an object (to simplify user code) on which we will try to find a nested
* or wrapped or both RegionMovedException
* or wrapped or both RegionMovedException
* @param source server that is the source of the location update.
*/
@Override
@ -1916,84 +1820,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
metaCache.clearCache(regionInfo);
}
@Override
public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
final Object exception, final HRegionLocation source) {
updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
}
/**
* @deprecated since 0.96 Use {@link org.apache.hadoop.hbase.client.HTableInterface#batch} instead
*/
@Override
@Deprecated
public void processBatch(List<? extends Row> list,
final TableName tableName,
ExecutorService pool,
Object[] results) throws IOException, InterruptedException {
// This belongs in HTable!!! Not in here. St.Ack
// results must be the same size as list
if (results.length != list.size()) {
throw new IllegalArgumentException(
"argument results must be the same size as argument list");
}
processBatchCallback(list, tableName, pool, results, null);
}
/**
* @deprecated Unsupported API
*/
@Override
@Deprecated
public void processBatch(List<? extends Row> list,
final byte[] tableName,
ExecutorService pool,
Object[] results) throws IOException, InterruptedException {
processBatch(list, TableName.valueOf(tableName), pool, results);
}
/**
* Send the queries in parallel on the different region servers. Retries on failures.
* If the method returns it means that there is no error, and the 'results' array will
* contain no exception. On error, an exception is thrown, and the 'results' array will
* contain results and exceptions.
* @deprecated since 0.96
* Use {@link org.apache.hadoop.hbase.client.HTable#processBatchCallback} instead
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
AsyncProcess.AsyncRequestFuture ars = this.asyncProcess.submitAll(
pool, tableName, list, callback, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
/**
* @deprecated Unsupported API
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
byte[] tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
}
// For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available.
@ -2024,41 +1850,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return metaCache.getNumberOfCachedRegionLocations(tableName);
}
/**
* @deprecated always return false since 0.99
*/
@Override
@Deprecated
public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
}
/**
* @deprecated always return false since 0.99
*/
@Override
@Deprecated
public void setRegionCachePrefetch(final byte[] tableName,
final boolean enable) {
}
/**
* @deprecated always return false since 0.99
*/
@Override
@Deprecated
public boolean getRegionCachePrefetch(TableName tableName) {
return false;
}
/**
* @deprecated always return false since 0.99
*/
@Override
@Deprecated
public boolean getRegionCachePrefetch(byte[] tableName) {
return false;
}
@Override
public void abort(final String msg, Throwable t) {
if (t instanceof KeeperException.SessionExpiredException
@ -2133,146 +1924,20 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
close();
}
/**
* @deprecated Use {@link org.apache.hadoop.hbase.client.Admin#listTables()} instead
*/
@Deprecated
@Override
public HTableDescriptor[] listTables() throws IOException {
MasterKeepAliveConnection master = getKeepAliveMasterService();
try {
MasterProtos.GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
} finally {
master.close();
}
}
/**
* @deprecated Use {@link org.apache.hadoop.hbase.client.Admin#listTableNames()} instead
*/
@Deprecated
@Override
public String[] getTableNames() throws IOException {
TableName[] tableNames = listTableNames();
String[] result = new String[tableNames.length];
for (int i = 0; i < tableNames.length; i++) {
result[i] = tableNames[i].getNameAsString();
}
return result;
}
/**
* @deprecated Use {@link org.apache.hadoop.hbase.client.Admin#listTableNames()} instead
*/
@Deprecated
@Override
public TableName[] listTableNames() throws IOException {
MasterKeepAliveConnection master = getKeepAliveMasterService();
try {
return ProtobufUtil.getTableNameArray(master.getTableNames(null,
MasterProtos.GetTableNamesRequest.newBuilder().build())
.getTableNamesList());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
} finally {
master.close();
}
}
/**
* @deprecated Use {@link
* org.apache.hadoop.hbase.client.Admin#getTableDescriptorsByTableName(java.util.List)} instead
*/
@Deprecated
@Override
public HTableDescriptor[] getHTableDescriptorsByTableName(
List<TableName> tableNames) throws IOException {
if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
MasterKeepAliveConnection master = getKeepAliveMasterService();
try {
MasterProtos.GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(tableNames);
return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
} finally {
master.close();
}
}
/**
* @deprecated Use
* {@link org.apache.hadoop.hbase.client.Admin#getTableDescriptorsByTableName(java.util.List)}
* instead
*/
@Deprecated
@Override
public HTableDescriptor[] getHTableDescriptors(List<String> names) throws IOException {
List<TableName> tableNames = new ArrayList<TableName>(names.size());
for(String name : names) {
tableNames.add(TableName.valueOf(name));
}
return getHTableDescriptorsByTableName(tableNames);
}
@Override
public NonceGenerator getNonceGenerator() {
return nonceGenerator;
}
/**
* Connects to the master to get the table descriptor.
* @param tableName table name
* @throws java.io.IOException if the connection to master fails or if the table
* is not found.
* @deprecated Use {@link
* org.apache.hadoop.hbase.client.Admin#getTableDescriptor(org.apache.hadoop.hbase.TableName)}
* instead
*/
@Deprecated
@Override
public HTableDescriptor getHTableDescriptor(final TableName tableName)
throws IOException {
if (tableName == null) return null;
MasterKeepAliveConnection master = getKeepAliveMasterService();
MasterProtos.GetTableDescriptorsResponse htds;
try {
MasterProtos.GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(tableName);
htds = master.getTableDescriptors(null, req);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
} finally {
master.close();
}
if (!htds.getTableSchemaList().isEmpty()) {
return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
}
throw new TableNotFoundException(tableName.getNameAsString());
}
/**
* @deprecated Use {@link
* org.apache.hadoop.hbase.client.Admin#getTableDescriptor(org.apache.hadoop.hbase.TableName)}
* instead
*/
@Deprecated
@Override
public HTableDescriptor getHTableDescriptor(final byte[] tableName)
throws IOException {
return getHTableDescriptor(TableName.valueOf(tableName));
}
@Override
public TableState getTableState(TableName tableName) throws IOException {
if (this.closed) throw new IOException(toString() + " closed");
if (this.closed) {
throw new IOException(toString() + " closed");
}
TableState tableState = MetaTableAccessor.getTableState(this, tableName);
if (tableState == null) throw new TableNotFoundException(tableName);
if (tableState == null) {
throw new TableNotFoundException(tableName);
}
return tableState;
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@ -32,8 +34,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import com.google.common.annotations.VisibleForTesting;
/**
* Utility used by client connections.
*/
@ -45,8 +45,8 @@ public final class ConnectionUtils {
/**
* Calculate pause time.
* Built on {@link HConstants#RETRY_BACKOFF}.
* @param pause
* @param tries
* @param pause time to pause
* @param tries amount of tries
* @return How long to wait after <code>tries</code> retries
*/
public static long getPauseTime(final long pause, final int tries) {
@ -90,7 +90,7 @@ public final class ConnectionUtils {
}
/**
* Changes the configuration to set the number of retries needed when using HConnection
* Changes the configuration to set the number of retries needed when using Connection
* internally, e.g. for updating catalog tables, etc.
* Call this method before we create any Connections.
* @param c The Configuration instance to set the retries into.
@ -106,7 +106,7 @@ public final class ConnectionUtils {
int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
int retries = hcRetries * serversideMultiplier;
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
log.info(sn + " server-side HConnection retries=" + retries);
log.info(sn + " server-side Connection retries=" + retries);
}
/**
@ -119,7 +119,7 @@ public final class ConnectionUtils {
* @param admin the admin interface of the local server
* @param client the client interface of the local server
* @return an short-circuit connection.
* @throws IOException
* @throws IOException if IO failure occurred
*/
public static ClusterConnection createShortCircuitConnection(final Configuration conf,
ExecutorService pool, User user, final ServerName serverName,
@ -130,9 +130,8 @@ public final class ConnectionUtils {
}
return new ConnectionImplementation(conf, pool, user) {
@Override
public AdminService.BlockingInterface getAdmin(ServerName sn, boolean getMaster)
throws IOException {
return serverName.equals(sn) ? admin : super.getAdmin(sn, getMaster);
public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException {
return serverName.equals(sn) ? admin : super.getAdmin(sn);
}
@Override
@ -148,7 +147,7 @@ public final class ConnectionUtils {
*/
@VisibleForTesting
public static void setupMasterlessConnection(Configuration conf) {
conf.set(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
MasterlessConnection.class.getName());
}

View File

@ -18,6 +18,10 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -176,10 +180,6 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
/**
* HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
* this is an HBase-internal class as defined in
@ -313,9 +313,9 @@ public class HBaseAdmin implements Admin {
}
}
/** @return HConnection used by this object. */
/** @return Connection used by this object. */
@Override
public HConnection getConnection() {
public Connection getConnection() {
return connection;
}
@ -404,11 +404,11 @@ public class HBaseAdmin implements Admin {
@Override
public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory,
operationTimeout, rpcTimeout);
}
static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection,
static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
int operationTimeout, int rpcTimeout) throws IOException {
if (tableName == null) return null;
@ -588,7 +588,7 @@ public class HBaseAdmin implements Admin {
protected Void postOperationResult(final Void result, final long deadlineTs)
throws IOException, TimeoutException {
// Delete cached information to prevent clients from using old locations
getAdmin().getConnection().clearRegionCache(getTableName());
((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName());
return super.postOperationResult(result, deadlineTs);
}
}
@ -843,7 +843,7 @@ public class HBaseAdmin implements Admin {
@Override
public boolean isTableAvailable(TableName tableName) throws IOException {
return connection.isTableAvailable(tableName);
return connection.isTableAvailable(tableName, null);
}
@Override
@ -1701,7 +1701,7 @@ public class HBaseAdmin implements Admin {
* @param regionName Name of a region.
* @return a pair of HRegionInfo and ServerName if <code>regionName</code> is
* a verified region name (we call {@link
* MetaTableAccessor#getRegionLocation(HConnection, byte[])}
* MetaTableAccessor#getRegionLocation(Connection, byte[])}
* else null.
* Throw IllegalArgumentException if <code>regionName</code> is null.
* @throws IOException

View File

@ -1,626 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
/**
* A cluster connection. Knows how to find the master, locate regions out on the cluster,
* keeps a cache of locations and then knows how to re-calibrate after they move. You need one
* of these to talk to your HBase cluster. {@link ConnectionFactory} manages instances of this
* class. See it for how to get one of these.
*
* <p>This is NOT a connection to a particular server but to ALL servers in the cluster. Individual
* connections are managed at a lower level.
*
* <p>HConnections are used by {@link HTable} mostly but also by
* {@link HBaseAdmin}, and {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}.
*
* @see ConnectionFactory
* @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@Deprecated
public interface HConnection extends Connection {
/**
* Key for configuration in Configuration whose value is the class we implement making a
* new HConnection instance.
*/
public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl";
/**
* @return Configuration instance being used by this HConnection instance.
*/
@Override
Configuration getConfiguration();
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(String tableName) throws IOException;
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(byte[] tableName) throws IOException;
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
@Override
public HTableInterface getTable(TableName tableName) throws IOException;
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException;
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException;
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName table to get interface for
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
*/
@Override
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException;
/**
* Retrieve a RegionLocator implementation to inspect region information on a table. The returned
* RegionLocator is not thread-safe, so a new instance should be created for each using thread.
*
* This is a lightweight operation. Pooling or caching of the returned RegionLocator is neither
* required nor desired.
* @param tableName Name of the table who's region is to be examined
* @return A RegionLocator instance
*/
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException;
/**
* Retrieve an Admin implementation to administer an HBase cluster.
* The returned Admin is not guaranteed to be thread-safe. A new instance should be created for
* each using thread. This is a lightweight operation. Pooling or caching of the returned
* Admin is not recommended.
*
* @return an Admin instance for cluster administration
*/
@Override
Admin getAdmin() throws IOException;
/** @return - true if the master server is running
* @deprecated internal method, do not use thru HConnection */
@Deprecated
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
/**
* A table that isTableEnabled == false and isTableDisabled == false
* is possible. This happens when a table has a lot of regions
* that must be processed.
* @param tableName table name
* @return true if the table is enabled, false otherwise
* @throws IOException if a remote or network exception occurs
*/
boolean isTableEnabled(TableName tableName) throws IOException;
/**
* @deprecated instead use {@link #isTableEnabled(TableName)}
*/
@Deprecated
boolean isTableEnabled(byte[] tableName) throws IOException;
/**
* @param tableName table name
* @return true if the table is disabled, false otherwise
* @throws IOException if a remote or network exception occurs
*/
boolean isTableDisabled(TableName tableName) throws IOException;
/**
* @deprecated instead use {@link #isTableDisabled(TableName)}
*/
@Deprecated
boolean isTableDisabled(byte[] tableName) throws IOException;
/**
* Retrieve TableState, represent current table state.
* @param tableName table state for
* @return state of the table
*/
public TableState getTableState(TableName tableName) throws IOException;
/**
* @param tableName table name
* @return true if all regions of the table are available, false otherwise
* @throws IOException if a remote or network exception occurs
*/
boolean isTableAvailable(TableName tableName) throws IOException;
/**
* @deprecated instead use {@link #isTableAvailable(TableName)}
*/
@Deprecated
boolean isTableAvailable(byte[] tableName) throws IOException;
/**
* Use this api to check if the table has been created with the specified number of
* splitkeys which was used while creating the given table.
* Note : If this api is used after a table's region gets splitted, the api may return
* false.
* @param tableName tableName
* @param splitKeys splitKeys used while creating table
* @throws IOException if a remote or network exception occurs
* @deprecated internal method, do not use through HConnection */
@Deprecated
boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) throws IOException;
/**
* List all the userspace tables. In other words, scan the hbase:meta table.
*
* @return - returns an array of HTableDescriptors
* @throws IOException if a remote or network exception occurs
* @deprecated Use {@link Admin#listTables()} instead.
*/
@Deprecated
HTableDescriptor[] listTables() throws IOException;
// This is a bit ugly - We call this getTableNames in 0.94 and the
// successor function, returning TableName, listTableNames in later versions
// because Java polymorphism doesn't consider return value types
/**
* @deprecated Use {@link Admin#listTableNames()} instead.
*/
@Deprecated
String[] getTableNames() throws IOException;
/**
* @deprecated Use {@link Admin#listTables()} instead.
*/
@Deprecated
TableName[] listTableNames() throws IOException;
/**
* @param tableName table name
* @return table metadata
* @throws IOException if a remote or network exception occurs
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
HTableDescriptor getHTableDescriptor(TableName tableName)
throws IOException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
HTableDescriptor getHTableDescriptor(byte[] tableName)
throws IOException;
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
public HRegionLocation locateRegion(final TableName tableName,
final byte [] row) throws IOException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
public HRegionLocation locateRegion(final byte[] tableName,
final byte [] row) throws IOException;
/**
* Allows flushing the region cache.
* @deprecated internal method, do not use through HConnection */
@Deprecated
void clearRegionCache();
/**
* Allows flushing the region cache of all locations that pertain to
* <code>tableName</code>
* @param tableName Name of the table whose regions we are to remove from
* cache.
* @deprecated internal method, do not use through HConnection */
@Deprecated
void clearRegionCache(final TableName tableName);
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
void clearRegionCache(final byte[] tableName);
/**
* Deletes cached locations for the specific region.
* @param location The location object for the region, to be purged from cache.
* @deprecated internal method, do not use thru HConnection */
@Deprecated
void deleteCachedRegionLocation(final HRegionLocation location);
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in, ignoring any value that might be in the cache.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
* @deprecated internal method, do not use through HConnection */
@Deprecated
HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
HRegionLocation relocateRegion(final byte[] tableName,
final byte [] row) throws IOException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
void updateCachedLocations(TableName tableName, byte[] rowkey,
Object exception, HRegionLocation source);
/**
* Update the location cache. This is used internally by HBase, in most cases it should not be
* used by the client application.
* @param tableName the table name
* @param regionName the regionName
* @param rowkey the row
* @param exception the exception if any. Can be null.
* @param source the previous location
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey,
Object exception, ServerName source);
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
void updateCachedLocations(byte[] tableName, byte[] rowkey,
Object exception, HRegionLocation source);
/**
* Gets the location of the region of <i>regionName</i>.
* @param regionName name of the region to locate
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
* @deprecated internal method, do not use thru HConnection */
@Deprecated
HRegionLocation locateRegion(final byte[] regionName)
throws IOException;
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @return list of region locations for all regions of table
* @throws IOException
* @deprecated internal method, do not use thru HConnection */
@Deprecated
List<HRegionLocation> locateRegions(final TableName tableName) throws IOException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
List<HRegionLocation> locateRegions(final byte[] tableName) throws IOException;
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @param useCache Should we use the cache to retrieve the region information.
* @param offlined True if we are to include offlined regions, false and we'll leave out offlined
* regions from returned list.
* @return list of region locations for all regions of table
* @throws IOException
* @deprecated internal method, do not use thru HConnection
*/
@Deprecated
public List<HRegionLocation> locateRegions(final TableName tableName,
final boolean useCache,
final boolean offlined) throws IOException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
public List<HRegionLocation> locateRegions(final byte[] tableName,
final boolean useCache,
final boolean offlined) throws IOException;
/**
* Returns a {@link MasterKeepAliveConnection} to the active master
* @deprecated internal method, do not use thru HConnection */
@Deprecated
MasterService.BlockingInterface getMaster() throws IOException;
/**
* Establishes a connection to the region server at the specified address.
* @param serverName
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
* @deprecated internal method, do not use thru HConnection */
@Deprecated
AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException;
/**
* Establishes a connection to the region server at the specified address, and returns
* a region client protocol.
*
* @param serverName
* @return ClientProtocol proxy for RegionServer
* @throws IOException if a remote or network exception occurs
* @deprecated internal method, do not use thru HConnection */
@Deprecated
ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException;
/**
* Establishes a connection to the region server at the specified address.
* @param serverName
* @param getMaster do we check if master is alive
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
* @deprecated You can pass master flag but nothing special is done.
*/
@Deprecated
AdminService.BlockingInterface getAdmin(final ServerName serverName, boolean getMaster)
throws IOException;
/**
* Find region location hosting passed row
* @param tableName table name
* @param row Row to find.
* @param reload If true do not use cache, otherwise bypass.
* @return Location of row.
* @throws IOException if a remote or network exception occurs
* @deprecated internal method, do not use thru HConnection */
@Deprecated
HRegionLocation getRegionLocation(TableName tableName, byte [] row,
boolean reload)
throws IOException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
HRegionLocation getRegionLocation(byte[] tableName, byte [] row,
boolean reload)
throws IOException;
/**
* Process a mixed batch of Get, Put and Delete actions. All actions for a
* RegionServer are forwarded in one RPC call.
*
*
* @param actions The collection of actions.
* @param tableName Name of the hbase table
* @param pool thread pool for parallel execution
* @param results An empty array, same size as list. If an exception is thrown,
* you can test here for partial results, and to determine which actions
* processed successfully.
* @throws IOException if there are problems talking to META. Per-item
* exceptions are stored in the results array.
* @deprecated since 0.96 - Use {@link HTableInterface#batch} instead
*/
@Deprecated
void processBatch(List<? extends Row> actions, final TableName tableName,
ExecutorService pool, Object[] results) throws IOException, InterruptedException;
/**
* @deprecated internal method, do not use through HConnection
*/
@Deprecated
void processBatch(List<? extends Row> actions, final byte[] tableName,
ExecutorService pool, Object[] results) throws IOException, InterruptedException;
/**
* Parameterized batch processing, allowing varying return types for different
* {@link Row} implementations.
* @deprecated since 0.96 - Use {@link HTableInterface#batchCallback} instead
*/
@Deprecated
public <R> void processBatchCallback(List<? extends Row> list,
final TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException;
/**
* @deprecated Unsupported API
*/
@Deprecated
public <R> void processBatchCallback(List<? extends Row> list,
final byte[] tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException;
/**
* @deprecated does nothing since since 0.99
**/
@Deprecated
public void setRegionCachePrefetch(final TableName tableName,
final boolean enable);
/**
* @deprecated does nothing since 0.99
**/
@Deprecated
public void setRegionCachePrefetch(final byte[] tableName,
final boolean enable);
/**
* @deprecated always return false since 0.99
**/
@Deprecated
boolean getRegionCachePrefetch(final TableName tableName);
/**
* @deprecated always return false since 0.99
**/
@Deprecated
boolean getRegionCachePrefetch(final byte[] tableName);
/**
* @return the number of region servers that are currently running
* @throws IOException if a remote or network exception occurs
* @deprecated This method will be changed from public to package protected.
*/
@Deprecated
int getCurrentNrHRS() throws IOException;
/**
* @param tableNames List of table names
* @return HTD[] table metadata
* @throws IOException if a remote or network exception occurs
* @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead.
*/
@Deprecated
HTableDescriptor[] getHTableDescriptorsByTableName(List<TableName> tableNames) throws IOException;
/**
* @deprecated since 0.96.0
*/
@Deprecated
HTableDescriptor[] getHTableDescriptors(List<String> tableNames) throws
IOException;
/**
* @return true if this connection is closed
*/
@Override
boolean isClosed();
/**
* Clear any caches that pertain to server name <code>sn</code>.
* @param sn A server name
* @deprecated internal method, do not use thru HConnection */
@Deprecated
void clearCaches(final ServerName sn);
/**
* This function allows HBaseAdmin and potentially others to get a shared MasterService
* connection.
* @return The shared instance. Never returns null.
* @throws MasterNotRunningException
* @deprecated Since 0.96.0
*/
// TODO: Why is this in the public interface when the returned type is shutdown package access?
@Deprecated
MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException;
/**
* @param serverName
* @return true if the server is known as dead, false otherwise.
* @deprecated internal method, do not use thru HConnection */
@Deprecated
boolean isDeadServer(ServerName serverName);
/**
* @return Nonce generator for this HConnection; may be null if disabled in configuration.
* @deprecated internal method, do not use thru HConnection */
@Deprecated
public NonceGenerator getNonceGenerator();
}

View File

@ -18,6 +18,12 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@ -68,12 +74,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
* An implementation of {@link Table}. Used to communicate with a single HBase table.
* Lightweight. Get as needed and just close when done.
@ -149,7 +149,7 @@ public class HTable implements HTableInterface {
* Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to
* get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
* @param tableName Name of the table.
* @param connection HConnection to be used.
* @param connection Connection to be used.
* @param pool ExecutorService to be used.
* @throws IOException if a remote or network exception occurs
*/
@ -253,13 +253,10 @@ public class HTable implements HTableInterface {
/**
* <em>INTERNAL</em> Used by unit tests and tools to do low-level
* manipulations.
* @return An HConnection instance.
* @deprecated This method will be changed from public to package protected.
* @return A Connection instance.
*/
// TODO(tsuna): Remove this. Unit tests shouldn't require public helpers.
@Deprecated
@VisibleForTesting
public HConnection getConnection() {
protected Connection getConnection() {
return this.connection;
}
@ -500,9 +497,20 @@ public class HTable implements HTableInterface {
*/
@Override
public <R> void batchCallback(
final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException {
connection.processBatchCallback(actions, tableName, pool, results, callback);
final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException {
doBatchWithCallback(actions, results, callback, connection, pool, tableName);
}
public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
AsyncRequestFuture ars = connection.getAsyncProcess().submitAll(
pool, tableName, actions, callback, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
/**
@ -991,10 +999,10 @@ public class HTable implements HTableInterface {
*
* @param list The collection of actions.
* @param results An empty array, same size as list. If an exception is thrown,
* you can test here for partial results, and to determine which actions
* processed successfully.
* you can test here for partial results, and to determine which actions
* processed successfully.
* @throws IOException if there are problems talking to META. Per-item
* exceptions are stored in the results array.
* exceptions are stored in the results array.
*/
public <R> void processBatchCallback(
final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Used to communicate with a single HBase table.
* Obtain an instance from an {@link HConnection}.
* Obtain an instance from a {@link Connection}.
*
* @since 0.21.0
* @deprecated use {@link org.apache.hadoop.hbase.client.Table} instead

View File

@ -26,11 +26,11 @@ import java.io.IOException;
* @param <V> return type
*/
abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
protected HConnection connection;
protected ClusterConnection connection;
protected MasterKeepAliveConnection master;
public MasterCallable(final HConnection connection) {
this.connection = connection;
public MasterCallable(final Connection connection) {
this.connection = (ClusterConnection) connection;
}
@Override
@ -41,7 +41,9 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
@Override
public void close() throws IOException {
// The above prepare could fail but this would still be called though masterAdmin is null
if (this.master != null) this.master.close();
if (this.master != null) {
this.master.close();
}
}
@Override

View File

@ -121,9 +121,9 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
}
/**
* @return {@link HConnection} instance used by this Callable.
* @return {@link Connection} instance used by this Callable.
*/
HConnection getConnection() {
Connection getConnection() {
return this.connection;
}

View File

@ -28,9 +28,9 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* We inherit the current ZooKeeperWatcher implementation to change the semantic
* of the close: the new close won't immediately close the connection but
* will have a keep alive. See {@link HConnection}.
* will have a keep alive. See {@link ConnectionImplementation}.
* This allows to make it available with a consistent interface. The whole
* ZooKeeperWatcher use in HConnection will be then changed to remove the
* ZooKeeperWatcher use in ConnectionImplementation will be then changed to remove the
* watcher part.
*
* This class is intended to be used internally by HBase classes; but not by

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.zookeeper;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
@ -38,8 +40,6 @@ import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@ -59,14 +59,12 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Utility class to perform operation (get/wait for/verify/set/delete) on znode in ZooKeeper
* which keeps hbase:meta region server location.
*
* Stateless class with a bunch of static methods. Doesn't manage resources passed in
* (e.g. HConnection, ZooKeeperWatcher etc).
* (e.g. Connection, ZooKeeperWatcher etc).
*
* Meta region location is set by <code>RegionServerServices</code>.
* This class doesn't use ZK watchers, rather accesses ZK directly.
@ -260,7 +258,7 @@ public class MetaTableLocator {
* @throws java.io.IOException
* @throws InterruptedException
*/
public boolean verifyMetaRegionLocation(HConnection hConnection,
public boolean verifyMetaRegionLocation(ClusterConnection hConnection,
ZooKeeperWatcher zkw, final long timeout)
throws InterruptedException, IOException {
return verifyMetaRegionLocation(hConnection, zkw, timeout, HRegionInfo.DEFAULT_REPLICA_ID);
@ -268,7 +266,7 @@ public class MetaTableLocator {
/**
* Verify <code>hbase:meta</code> is deployed and accessible.
* @param hConnection
* @param connection
* @param zkw
* @param timeout How long to wait on zk for meta address (passed through to
* @param replicaId
@ -276,12 +274,12 @@ public class MetaTableLocator {
* @throws InterruptedException
* @throws IOException
*/
public boolean verifyMetaRegionLocation(HConnection hConnection,
public boolean verifyMetaRegionLocation(ClusterConnection connection,
ZooKeeperWatcher zkw, final long timeout, int replicaId)
throws InterruptedException, IOException {
AdminProtos.AdminService.BlockingInterface service = null;
try {
service = getMetaServerConnection(hConnection, zkw, timeout, replicaId);
service = getMetaServerConnection(connection, zkw, timeout, replicaId);
} catch (NotAllMetaRegionsOnlineException e) {
// Pass
} catch (ServerNotRunningYetException e) {
@ -291,7 +289,7 @@ public class MetaTableLocator {
} catch (RegionServerStoppedException e) {
// Pass -- server name sends us to a server that is dying or already dead.
}
return (service != null) && verifyRegionLocation(hConnection, service,
return (service != null) && verifyRegionLocation(connection, service,
getMetaRegionLocation(zkw, replicaId), RegionReplicaUtil.getRegionInfoForReplica(
HRegionInfo.FIRST_META_REGIONINFO, replicaId).getRegionName());
}
@ -311,7 +309,7 @@ public class MetaTableLocator {
// rather than have to pass it in. Its made awkward by the fact that the
// HRI is likely a proxy against remote server so the getServerName needs
// to be fixed to go to a local method or to a cache before we can do this.
private boolean verifyRegionLocation(final Connection connection,
private boolean verifyRegionLocation(final ClusterConnection connection,
AdminService.BlockingInterface hostingServer, final ServerName address,
final byte [] regionName)
throws IOException {
@ -320,10 +318,7 @@ public class MetaTableLocator {
return false;
}
Throwable t;
PayloadCarryingRpcController controller = null;
if (connection instanceof ClusterConnection) {
controller = ((ClusterConnection) connection).getRpcControllerFactory().newController();
}
PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController();
try {
// Try and get regioninfo from the hosting server.
return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null;
@ -354,7 +349,7 @@ public class MetaTableLocator {
* Gets a connection to the server hosting meta, as reported by ZooKeeper,
* waiting up to the specified timeout for availability.
* <p>WARNING: Does not retry. Use an {@link org.apache.hadoop.hbase.client.HTable} instead.
* @param hConnection
* @param connection
* @param zkw
* @param timeout How long to wait on meta location
* @param replicaId
@ -363,10 +358,10 @@ public class MetaTableLocator {
* @throws NotAllMetaRegionsOnlineException if timed out waiting
* @throws IOException
*/
private AdminService.BlockingInterface getMetaServerConnection(HConnection hConnection,
private AdminService.BlockingInterface getMetaServerConnection(ClusterConnection connection,
ZooKeeperWatcher zkw, long timeout, int replicaId)
throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
return getCachedConnection(hConnection, waitMetaRegionLocation(zkw, replicaId, timeout));
return getCachedConnection(connection, waitMetaRegionLocation(zkw, replicaId, timeout));
}
/**
@ -377,7 +372,7 @@ public class MetaTableLocator {
* @throws IOException
*/
@SuppressWarnings("deprecation")
private static AdminService.BlockingInterface getCachedConnection(HConnection hConnection,
private static AdminService.BlockingInterface getCachedConnection(ClusterConnection connection,
ServerName sn)
throws IOException {
if (sn == null) {
@ -385,7 +380,7 @@ public class MetaTableLocator {
}
AdminService.BlockingInterface service = null;
try {
service = hConnection.getAdmin(sn);
service = connection.getAdmin(sn);
} catch (RetriesExhaustedException e) {
if (e.getCause() != null && e.getCause() instanceof ConnectException) {
// Catch this; presume it means the cached connection has gone bad.

View File

@ -104,7 +104,7 @@ public class TestClientNoCluster extends Configured implements Tool {
@Before
public void setUp() throws Exception {
this.conf = HBaseConfiguration.create();
// Run my HConnection overrides. Use my little ConnectionImplementation below which
// Run my Connection overrides. Use my little ConnectionImplementation below which
// allows me insert mocks and also use my Registry below rather than the default zk based
// one so tests run faster and don't have zk dependency.
this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());

View File

@ -101,11 +101,11 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
* Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
*/
static class PerfEvalCallable implements Callable<TimingResult> {
private final Queue<String> argv = new LinkedList<String>();
private final Queue<String> argv = new LinkedList<>();
private final Admin admin;
public PerfEvalCallable(Admin admin, String argv) {
// TODO: this API is awkward, should take HConnection, not HBaseAdmin
// TODO: this API is awkward, should take Connection, not Admin
this.admin = admin;
this.argv.addAll(Arrays.asList(argv.split(" ")));
LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);

View File

@ -41,10 +41,10 @@ import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@ -452,7 +452,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
@Override
protected void handleFailure(Counters counters) throws IOException {
Configuration conf = job.getConfiguration();
HConnection conn = (HConnection) ConnectionFactory.createConnection(conf);
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
CounterGroup g = counters.getGroup("undef");
Iterator<Counter> it = g.iterator();

View File

@ -19,7 +19,6 @@
*/
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
@ -35,9 +34,9 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
@ -50,20 +49,11 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -190,7 +180,8 @@ public class TestRSGroups extends TestRSGroupsBase {
});
ServerName targetServer =
ServerName.parseServerName(appInfo.getServers().iterator().next().toString());
AdminProtos.AdminService.BlockingInterface rs = admin.getConnection().getAdmin(targetServer);
AdminProtos.AdminService.BlockingInterface rs =
((ClusterConnection) admin.getConnection()).getAdmin(targetServer);
//verify it was assigned to the right group
Assert.assertEquals(1, ProtobufUtil.getOnlineRegions(rs).size());
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -482,7 +483,7 @@ public abstract class TestRSGroupsBase {
}
final AdminProtos.AdminService.BlockingInterface targetRS =
admin.getConnection().getAdmin(targetServer);
((ClusterConnection) admin.getConnection()).getAdmin(targetServer);
//move target server to group
rsGroupAdmin.moveServers(Sets.newHashSet(targetServer.getHostPort()),
@ -571,7 +572,7 @@ public abstract class TestRSGroupsBase {
ServerName targetServer = ServerName.parseServerName(
appInfo.getServers().iterator().next().toString());
AdminProtos.AdminService.BlockingInterface targetRS =
admin.getConnection().getAdmin(targetServer);
((ClusterConnection) admin.getConnection()).getAdmin(targetServer);
HRegionInfo targetRegion = ProtobufUtil.getOnlineRegions(targetRS).get(0);
Assert.assertEquals(1, ProtobufUtil.getOnlineRegions(targetRS).size());
@ -612,7 +613,7 @@ public abstract class TestRSGroupsBase {
targetServer = ServerName.parseServerName(
newServers.iterator().next().toString());
targetRS =
admin.getConnection().getAdmin(targetServer);
((ClusterConnection) admin.getConnection()).getAdmin(targetServer);
Assert.assertEquals(1, ProtobufUtil.getOnlineRegions(targetRS).size());
Assert.assertEquals(tableName,
ProtobufUtil.getOnlineRegions(targetRS).get(0).getTable());

View File

@ -175,7 +175,7 @@ public class LocalHBaseCluster {
Configuration config, final int index)
throws IOException {
// Create each regionserver with its own Configuration instance so each has
// its HConnection instance rather than share (see HBASE_INSTANCES down in
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager).
// Also, create separate CoordinatedStateManager instance per Server.
@ -210,7 +210,7 @@ public class LocalHBaseCluster {
public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
throws IOException {
// Create each master with its own Configuration instance so each has
// its HConnection instance rather than share (see HBASE_INSTANCES down in
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager.
// Also, create separate CoordinatedStateManager instance per Server.

View File

@ -21,10 +21,10 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -44,7 +44,7 @@ class HFileArchiveManager {
private final ZooKeeperWatcher zooKeeper;
private volatile boolean stopped = false;
public HFileArchiveManager(HConnection connection, Configuration conf)
public HFileArchiveManager(Connection connection, Configuration conf)
throws ZooKeeperConnectionException, IOException {
this.zooKeeper = new ZooKeeperWatcher(conf, "hfileArchiveManager-on-" + connection.toString(),
connection);
@ -74,7 +74,7 @@ class HFileArchiveManager {
*/
public HFileArchiveManager disableHFileBackup(byte[] table) throws KeeperException {
disable(this.zooKeeper, table);
return this;
return this;
}
/**

View File

@ -43,11 +43,11 @@ public class CoprocessorHConnection extends ConnectionImplementation {
private static final NonceGenerator NO_NONCE_GEN = new NoNonceGenerator();
/**
* Create an {@link HConnection} based on the environment in which we are running the
* coprocessor. The {@link HConnection} must be externally cleaned up (we bypass the usual HTable
* cleanup mechanisms since we own everything).
* @param env environment hosting the {@link HConnection}
* @return instance of {@link HConnection}.
* Create a {@link ClusterConnection} based on the environment in which we are running the
* coprocessor. The {@link ClusterConnection} must be externally cleaned up
* (we bypass the usual HTable cleanup mechanisms since we own everything).
* @param env environment hosting the {@link ClusterConnection}
* @return instance of {@link ClusterConnection}.
* @throws IOException if we cannot create the connection
*/
public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)

View File

@ -144,7 +144,7 @@ public class RegionStateStore {
if (metaRegion == null) {
Configuration conf = server.getConfiguration();
// Config to determine the no of HConnections to META.
// A single HConnection should be sufficient in most cases. Only if
// A single Connection should be sufficient in most cases. Only if
// you are doing lot of writes (>1M) to META,
// increasing this value might improve the write throughput.
multiHConnection =

View File

@ -712,7 +712,7 @@ public class HRegionServer extends HasThread implements
}
/**
* Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
* Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to
* the local server. Safe to use going to local or remote server.
* Create this instance in a method can be intercepted and mocked in tests.
* @throws IOException
@ -1074,7 +1074,7 @@ public class HRegionServer extends HasThread implements
} catch (IOException e) {
// Although the {@link Closeable} interface throws an {@link
// IOException}, in reality, the implementation would never do that.
LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
}
}
@ -1786,7 +1786,7 @@ public class HRegionServer extends HasThread implements
// Create the log splitting worker and start it
// set a smaller retries to fast fail otherwise splitlogworker could be blocked for
// quite a while inside HConnection layer. The worker won't be available for other
// quite a while inside Connection layer. The worker won't be available for other
// tasks even after current task is preempted after a split task times out.
Configuration sinkConf = HBaseConfiguration.create(conf);
sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,

View File

@ -1033,7 +1033,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
priority = createPriority();
String name = rs.getProcessName() + "/" + initialIsa.toString();
// Set how many times to retry talking to another server over HConnection.
// Set how many times to retry talking to another server over Connection.
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
try {
rpcServer = new RpcServer(rs, name, getServices(),

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@ -27,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@ -36,7 +37,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
@ -50,8 +53,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import com.google.protobuf.ServiceException;
/**
* This class is responsible for replaying the edits coming from a failed region server.
* <p>
@ -65,22 +66,22 @@ public class WALEditsReplaySink {
private static final int MAX_BATCH_SIZE = 1024;
private final Configuration conf;
private final HConnection conn;
private final ClusterConnection conn;
private final TableName tableName;
private final MetricsWALEditsReplay metrics;
private final AtomicLong totalReplayedEdits = new AtomicLong();
private final boolean skipErrors;
private final int replayTimeout;
private RpcControllerFactory rpcControllerFactory;
private final RpcControllerFactory rpcControllerFactory;
/**
* Create a sink for WAL log entries replay
* @param conf
* @param tableName
* @param conn
* @throws IOException
* @param conf configuration
* @param tableName of table to replay edits of
* @param conn connection to use
* @throws IOException on IO failure
*/
public WALEditsReplaySink(Configuration conf, TableName tableName, HConnection conn)
public WALEditsReplaySink(Configuration conf, TableName tableName, ClusterConnection conn)
throws IOException {
this.conf = conf;
this.metrics = new MetricsWALEditsReplay();
@ -95,8 +96,8 @@ public class WALEditsReplaySink {
/**
* Replay an array of actions of the same region directly into the newly assigned Region Server
* @param entries
* @throws IOException
* @param entries to replay
* @throws IOException on IO failure
*/
public void replayEntries(List<Pair<HRegionLocation, Entry>> entries) throws IOException {
if (entries.size() == 0) {
@ -105,7 +106,7 @@ public class WALEditsReplaySink {
int batchSize = entries.size();
Map<HRegionInfo, List<Entry>> entriesByRegion =
new HashMap<HRegionInfo, List<Entry>>();
new HashMap<>();
HRegionLocation loc = null;
Entry entry = null;
List<Entry> regionEntries = null;
@ -186,7 +187,7 @@ public class WALEditsReplaySink {
private HRegionInfo regionInfo;
private List<Entry> entries;
ReplayServerCallable(final HConnection connection, final TableName tableName,
ReplayServerCallable(final Connection connection, final TableName tableName,
final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List<Entry> entries) {
super(connection, tableName, null);

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
@ -33,7 +35,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -43,8 +44,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@ -70,7 +71,7 @@ import org.apache.hadoop.ipc.RemoteException;
public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
private HConnection conn;
private ClusterConnection conn;
private Configuration conf;
@ -104,7 +105,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = (HConnection) ConnectionFactory.createConnection(this.conf);
this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();

View File

@ -562,10 +562,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
if (cause instanceof IOException) {
// The table can be disabled or dropped at this time. For disabled tables, we have no
// cheap mechanism to detect this case because meta does not contain this information.
// HConnection.isTableDisabled() is a zk call which we cannot do for every replay RPC.
// So instead we start the replay RPC with retries and
// check whether the table is dropped or disabled which might cause
// SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
// ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
// RPC. So instead we start the replay RPC with retries and check whether the table is
// dropped or disabled which might cause SocketTimeoutException, or
// RetriesExhaustedException or similar if we get IOE.
if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName

View File

@ -17,22 +17,25 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* Maintains a collection of peers to replicate to, and randomly selects a
@ -57,7 +60,7 @@ public class ReplicationSinkManager {
static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f;
private final HConnection conn;
private final Connection conn;
private final String peerClusterId;
@ -89,7 +92,7 @@ public class ReplicationSinkManager {
* @param conf HBase configuration, used for determining replication source ratio and bad peer
* threshold
*/
public ReplicationSinkManager(HConnection conn, String peerClusterId,
public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
HBaseReplicationEndpoint endpoint, Configuration conf) {
this.conn = conn;
this.peerClusterId = peerClusterId;
@ -116,7 +119,7 @@ public class ReplicationSinkManager {
throw new IOException("No replication sinks are available");
}
ServerName serverName = sinks.get(random.nextInt(sinks.size()));
return new SinkPeer(serverName, conn.getAdmin(serverName));
return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
}
/**

View File

@ -207,7 +207,7 @@ public class ConnectionCache {
return false;
}
if (connection.isAborted() || connection.isClosed()) {
LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
LOG.info("Unexpected: cached Connection is aborted/closed, removed from cache");
connections.remove(userName);
return false;
}

View File

@ -17,6 +17,16 @@
*/
package org.apache.hadoop.hbase.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.collect.TreeMultimap;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -55,15 +65,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.collect.TreeMultimap;
import com.google.protobuf.ServiceException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
@ -104,7 +105,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@ -413,7 +413,7 @@ public class HBaseFsck extends Configured implements Closeable {
* This method maintains a lock using a file. If the creation fails we return null
*
* @return FSDataOutputStream object corresponding to the newly opened lock file
* @throws IOException
* @throws IOException if IO failure occurs
*/
private FSDataOutputStream checkAndMarkRunningHbck() throws IOException {
RetryCounter retryCounter = lockFileRetryCounterFactory.create();
@ -3981,13 +3981,13 @@ public class HBaseFsck extends Configured implements Closeable {
* Contact a region server and get all information from it
*/
static class WorkItemRegion implements Callable<Void> {
private HBaseFsck hbck;
private ServerName rsinfo;
private ErrorReporter errors;
private HConnection connection;
private final HBaseFsck hbck;
private final ServerName rsinfo;
private final ErrorReporter errors;
private final ClusterConnection connection;
WorkItemRegion(HBaseFsck hbck, ServerName info,
ErrorReporter errors, HConnection connection) {
ErrorReporter errors, ClusterConnection connection) {
this.hbck = hbck;
this.rsinfo = info;
this.errors = errors;

View File

@ -18,6 +18,11 @@
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -33,7 +38,6 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.RegionState;
@ -41,12 +45,6 @@ import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* This class contains helper methods that repair parts of hbase's filesystem
* contents.
@ -64,7 +62,7 @@ public class HBaseFsckRepair {
* @param region Region to undeploy
* @param servers list of Servers to undeploy from
*/
public static void fixMultiAssignment(HConnection connection, HRegionInfo region,
public static void fixMultiAssignment(Connection connection, HRegionInfo region,
List<ServerName> servers)
throws IOException, KeeperException, InterruptedException {
HRegionInfo actualRegion = new HRegionInfo(region);
@ -153,7 +151,7 @@ public class HBaseFsckRepair {
* (default 120s) to close the region. This bypasses the active hmaster.
*/
@SuppressWarnings("deprecation")
public static void closeRegionSilentlyAndWait(HConnection connection,
public static void closeRegionSilentlyAndWait(Connection connection,
ServerName server, HRegionInfo region) throws IOException, InterruptedException {
long timeout = connection.getConfiguration()
.getLong("hbase.hbck.close.timeout", 120000);

View File

@ -37,10 +37,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
@ -103,10 +103,10 @@ class HMerge {
final TableName tableName, final boolean testMasterRunning)
throws IOException {
boolean masterIsRunning = false;
HConnection hConnection = null;
ClusterConnection hConnection = null;
if (testMasterRunning) {
try {
hConnection = (HConnection) ConnectionFactory.createConnection(conf);
hConnection = (ClusterConnection) ConnectionFactory.createConnection(conf);
masterIsRunning = hConnection.isMasterRunning();
} finally {
if (hConnection != null) {

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@ -30,42 +29,44 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
/**
* Provides ability to create multiple HConnection instances and allows to process a batch of
* actions using HConnection.processBatchCallback()
* Provides ability to create multiple Connection instances and allows to process a batch of
* actions using CHTable.doBatchWithCallback()
*/
@InterfaceAudience.Private
public class MultiHConnection {
private static final Log LOG = LogFactory.getLog(MultiHConnection.class);
private HConnection[] hConnections;
private final Object hConnectionsLock = new Object();
private int noOfConnections;
private Connection[] connections;
private final Object connectionsLock = new Object();
private final int noOfConnections;
private ExecutorService batchPool;
/**
* Create multiple HConnection instances and initialize a thread pool executor
* Create multiple Connection instances and initialize a thread pool executor
* @param conf configuration
* @param noOfConnections total no of HConnections to create
* @throws IOException
* @param noOfConnections total no of Connections to create
* @throws IOException if IO failure occurs
*/
public MultiHConnection(Configuration conf, int noOfConnections)
throws IOException {
this.noOfConnections = noOfConnections;
synchronized (this.hConnectionsLock) {
hConnections = new HConnection[noOfConnections];
synchronized (this.connectionsLock) {
connections = new Connection[noOfConnections];
for (int i = 0; i < noOfConnections; i++) {
HConnection conn = (HConnection) ConnectionFactory.createConnection(conf);
hConnections[i] = conn;
Connection conn = ConnectionFactory.createConnection(conf);
connections[i] = conn;
}
}
createBatchPool(conf);
@ -75,9 +76,9 @@ public class MultiHConnection {
* Close the open connections and shutdown the batchpool
*/
public void close() {
synchronized (hConnectionsLock) {
if (hConnections != null) {
for (Connection conn : hConnections) {
synchronized (connectionsLock) {
if (connections != null) {
for (Connection conn : connections) {
if (conn != null) {
try {
conn.close();
@ -88,7 +89,7 @@ public class MultiHConnection {
}
}
}
hConnections = null;
connections = null;
}
}
if (this.batchPool != null && !this.batchPool.isShutdown()) {
@ -109,28 +110,21 @@ public class MultiHConnection {
* @param actions the actions
* @param tableName table name
* @param results the results array
* @param callback
* @throws IOException
* @param callback to run when results are in
* @throws IOException If IO failure occurs
*/
@SuppressWarnings("deprecation")
public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName,
Object[] results, Batch.Callback<R> callback) throws IOException {
// Currently used by RegionStateStore
// A deprecated method is used as multiple threads accessing RegionStateStore do a single put
// and htable is not thread safe. Alternative would be to create an Htable instance for each
// put but that is not very efficient.
// See HBASE-11610 for more details.
try {
hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].processBatchCallback(
actions, tableName, this.batchPool, results, callback);
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
ClusterConnection conn =
(ClusterConnection) connections[ThreadLocalRandom.current().nextInt(noOfConnections)];
HTable.doBatchWithCallback(actions, results, callback, conn, batchPool, tableName);
}
// Copied from ConnectionImplementation.getBatchPool()
// We should get rid of this when HConnection.processBatchCallback is un-deprecated and provides
// We should get rid of this when Connection.processBatchCallback is un-deprecated and provides
// an API to manage a batch pool
private void createBatchPool(Configuration conf) {
// Use the same config for keep alive as in ConnectionImplementation.getBatchPool();
@ -140,7 +134,7 @@ public class MultiHConnection {
}
long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads
new LinkedBlockingQueue<>(maxThreads
* conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe =

View File

@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
@ -51,14 +51,14 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class ReplicationChecker {
private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
private ErrorReporter errorReporter;
private ReplicationQueuesClient queuesClient;
private ReplicationPeers replicationPeers;
private ReplicationQueueDeletor queueDeletor;
private final ErrorReporter errorReporter;
private final ReplicationQueuesClient queuesClient;
private final ReplicationPeers replicationPeers;
private final ReplicationQueueDeletor queueDeletor;
// replicator with its queueIds for removed peers
private Map<String, List<String>> undeletedQueueIds = new HashMap<String, List<String>>();
private final Map<String, List<String>> undeletedQueueIds = new HashMap<>();
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection,
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection,
ErrorReporter errorReporter) throws IOException {
try {
this.errorReporter = errorReporter;
@ -79,7 +79,7 @@ public class ReplicationChecker {
}
public void checkUnDeletedQueues() throws IOException {
Set<String> peerIds = new HashSet<String>(this.replicationPeers.getAllPeerIds());
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
try {
List<String> replicators = this.queuesClient.getListOfReplicators();
for (String replicator : replicators) {

View File

@ -76,11 +76,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.TableState;
@ -1694,29 +1694,28 @@ public class WALSplitter {
private static final double BUFFER_THRESHOLD = 0.35;
private static final String KEY_DELIMITER = "#";
private long waitRegionOnlineTimeOut;
private final long waitRegionOnlineTimeOut;
private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
private final Map<String, RegionServerWriter> writers =
new ConcurrentHashMap<String, RegionServerWriter>();
private final Map<String, RegionServerWriter> writers = new ConcurrentHashMap<>();
// online encoded region name -> region location map
private final Map<String, HRegionLocation> onlineRegions =
new ConcurrentHashMap<String, HRegionLocation>();
private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
.synchronizedMap(new TreeMap<TableName, HConnection>());
private final Map<TableName, ClusterConnection> tableNameToHConnectionMap = Collections
.synchronizedMap(new TreeMap<TableName, ClusterConnection>());
/**
* Map key -> value layout
* {@literal <servername>:<table name> -> Queue<Row>}
*/
private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
private List<Throwable> thrown = new ArrayList<Throwable>();
private final Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
new ConcurrentHashMap<>();
private final List<Throwable> thrown = new ArrayList<>();
// The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
// table. It's a limitation of distributedLogReplay. Because log replay needs a region is
// assigned and online before it can replay wal edits while regions of disabling/disabled table
// won't be assigned by AM. We can retire this code after HBASE-8234.
private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
private final LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
private boolean hasEditsInDisablingOrDisabledTables = false;
public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
@ -1809,8 +1808,8 @@ public class WALSplitter {
HRegionLocation loc = null;
String locKey = null;
List<Cell> cells = edit.getCells();
List<Cell> skippedCells = new ArrayList<Cell>();
HConnection hconn = this.getConnectionByTableName(table);
List<Cell> skippedCells = new ArrayList<>();
ClusterConnection cconn = this.getConnectionByTableName(table);
for (Cell cell : cells) {
byte[] row = CellUtil.cloneRow(cell);
@ -1838,7 +1837,7 @@ public class WALSplitter {
try {
loc =
locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
locateRegionAndRefreshLastFlushedSequenceId(cconn, table, row,
encodeRegionNameStr);
// skip replaying the compaction if the region is gone
if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
@ -1912,13 +1911,13 @@ public class WALSplitter {
* destination region is online for replay.
* @throws IOException
*/
private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(ClusterConnection cconn,
TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
// fetch location from cache
HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
if(loc != null) return loc;
// fetch location from hbase:meta directly without using cache to avoid hit old dead server
loc = hconn.getRegionLocation(table, row, true);
loc = cconn.getRegionLocation(table, row, true);
if (loc == null) {
throw new IOException("Can't locate location for row:" + Bytes.toString(row)
+ " of table:" + table);
@ -1931,7 +1930,7 @@ public class WALSplitter {
if (tmpLoc != null) return tmpLoc;
}
Long lastFlushedSequenceId = -1l;
Long lastFlushedSequenceId = -1L;
AtomicBoolean isRecovering = new AtomicBoolean(true);
loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
if (!isRecovering.get()) {
@ -2010,11 +2009,11 @@ public class WALSplitter {
while (endTime > EnvironmentEdgeManager.currentTime()) {
try {
// Try and get regioninfo from the hosting server.
HConnection hconn = getConnectionByTableName(tableName);
ClusterConnection cconn = getConnectionByTableName(tableName);
if(reloadLocation) {
loc = hconn.getRegionLocation(tableName, row, true);
loc = cconn.getRegionLocation(tableName, row, true);
}
BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
BlockingInterface remoteSvr = cconn.getAdmin(loc.getServerName());
HRegionInfo region = loc.getRegionInfo();
try {
GetRegionInfoRequest request =
@ -2146,12 +2145,12 @@ public class WALSplitter {
// close connections
synchronized (this.tableNameToHConnectionMap) {
for (Map.Entry<TableName,HConnection> entry :
for (Map.Entry<TableName, ClusterConnection> entry :
this.tableNameToHConnectionMap.entrySet()) {
HConnection hconn = entry.getValue();
ClusterConnection cconn = entry.getValue();
try {
hconn.clearRegionCache();
hconn.close();
cconn.clearRegionCache();
cconn.close();
} catch (IOException ioe) {
result.add(ioe);
}
@ -2165,7 +2164,7 @@ public class WALSplitter {
@Override
public Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
@ -2215,7 +2214,7 @@ public class WALSplitter {
throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
}
HConnection hconn = getConnectionByTableName(tableName);
ClusterConnection hconn = getConnectionByTableName(tableName);
synchronized (writers) {
ret = writers.get(loc);
if (ret == null) {
@ -2226,18 +2225,18 @@ public class WALSplitter {
return ret;
}
private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
if (hconn == null) {
private ClusterConnection getConnectionByTableName(final TableName tableName) throws IOException {
ClusterConnection cconn = this.tableNameToHConnectionMap.get(tableName);
if (cconn == null) {
synchronized (this.tableNameToHConnectionMap) {
hconn = this.tableNameToHConnectionMap.get(tableName);
if (hconn == null) {
hconn = (HConnection) ConnectionFactory.createConnection(conf);
this.tableNameToHConnectionMap.put(tableName, hconn);
cconn = this.tableNameToHConnectionMap.get(tableName);
if (cconn == null) {
cconn = (ClusterConnection) ConnectionFactory.createConnection(conf);
this.tableNameToHConnectionMap.put(tableName, cconn);
}
}
}
return hconn;
return cconn;
}
private TableName getTableFromLocationStr(String loc) {
/**
@ -2258,7 +2257,7 @@ public class WALSplitter {
private final static class RegionServerWriter extends SinkWriter {
final WALEditsReplaySink sink;
RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
RegionServerWriter(final Configuration conf, final TableName tableName, final ClusterConnection conn)
throws IOException {
this.sink = new WALEditsReplaySink(conf, tableName, conn);
}

View File

@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -358,8 +357,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
/**
* Returns this classes's instance of {@link Configuration}. Be careful how
* you use the returned Configuration since {@link HConnection} instances
* can be shared. The Map of HConnections is keyed by the Configuration. If
* you use the returned Configuration since {@link Connection} instances
* can be shared. The Map of Connections is keyed by the Configuration. If
* say, a Connection was being used against a cluster that had been shutdown,
* see {@link #shutdownMiniCluster()}, then the Connection will no longer
* be wholesome. Rather than use the return direct, its usually best to

View File

@ -170,7 +170,7 @@ public class TestMetaTableAccessorNoCluster {
return builder.build();
}
}).thenReturn(ScanResponse.newBuilder().setMoreResults(false).build());
// Associate a spied-upon HConnection with UTIL.getConfiguration. Need
// Associate a spied-upon Connection with UTIL.getConfiguration. Need
// to shove this in here first so it gets picked up all over; e.g. by
// HTable.
connection = HConnectionTestingUtility.getSpiedConnection(UTIL.getConfiguration());

View File

@ -37,17 +37,17 @@ import org.mockito.Mockito;
public class HConnectionTestingUtility {
/*
* Not part of {@link HBaseTestingUtility} because this class is not
* in same package as {@link HConnection}. Would have to reveal ugly
* in same package as {@link ClusterConnection}. Would have to reveal ugly
* {@link ConnectionImplementation} innards to HBaseTestingUtility to give it access.
*/
/**
* Get a Mocked {@link HConnection} that goes with the passed <code>conf</code>
* Get a Mocked {@link ClusterConnection} that goes with the passed <code>conf</code>
* configuration instance. Minimally the mock will return
* <code>conf</conf> when {@link ClusterConnection#getConfiguration()} is invoked.
* Be sure to shutdown the connection when done by calling
* {@link Connection#close()} else it will stick around; this is probably not what you want.
* @param conf configuration
* @return HConnection object for <code>conf</code>
* @return ClusterConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static ClusterConnection getMockedConnection(final Configuration conf)
@ -139,7 +139,7 @@ public class HConnectionTestingUtility {
* Be sure to shutdown the connection when done by calling
* {@link Connection#close()} else it will stick around; this is probably not what you want.
* @param conf configuration
* @return HConnection object for <code>conf</code>
* @return ClusterConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
* @see @link
* {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}

View File

@ -631,12 +631,12 @@ public class TestAdmin1 {
expectedRegions) throws IOException {
int numRS = c.getCurrentNrHRS();
List<HRegionLocation> regions = regionLocator.getAllRegionLocations();
Map<ServerName, List<HRegionInfo>> server2Regions = new HashMap<ServerName, List<HRegionInfo>>();
Map<ServerName, List<HRegionInfo>> server2Regions = new HashMap<>();
for (HRegionLocation loc : regions) {
ServerName server = loc.getServerName();
List<HRegionInfo> regs = server2Regions.get(server);
if (regs == null) {
regs = new ArrayList<HRegionInfo>();
regs = new ArrayList<>();
server2Regions.put(server, regs);
}
regs.add(loc.getRegionInfo());
@ -1176,7 +1176,7 @@ public class TestAdmin1 {
byte[][] splitRows = new byte[2][];
splitRows[0] = new byte[]{(byte)'4'};
splitRows[1] = new byte[]{(byte)'7'};
TEST_UTIL.getHBaseAdmin().createTable(desc, splitRows);
TEST_UTIL.getAdmin().createTable(desc, splitRows);
List<HRegion> oldRegions;
do {
oldRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
@ -1203,7 +1203,7 @@ public class TestAdmin1 {
// the element at index 1 would be a replica (since the metareader gives us ordered
// regions). Try splitting that region via the split API . Should fail
try {
TEST_UTIL.getHBaseAdmin().splitRegion(regions.get(1).getFirst().getRegionName());
TEST_UTIL.getAdmin().splitRegion(regions.get(1).getFirst().getRegionName());
} catch (IllegalArgumentException ex) {
gotException = true;
}
@ -1222,7 +1222,7 @@ public class TestAdmin1 {
gotException = false;
// Try merging a replica with another. Should fail.
try {
TEST_UTIL.getHBaseAdmin().mergeRegions(regions.get(1).getFirst().getEncodedNameAsBytes(),
TEST_UTIL.getAdmin().mergeRegions(regions.get(1).getFirst().getEncodedNameAsBytes(),
regions.get(2).getFirst().getEncodedNameAsBytes(), true);
} catch (IllegalArgumentException m) {
gotException = true;
@ -1233,7 +1233,8 @@ public class TestAdmin1 {
DispatchMergingRegionsRequest request = RequestConverter
.buildDispatchMergingRegionsRequest(regions.get(1).getFirst().getEncodedNameAsBytes(),
regions.get(2).getFirst().getEncodedNameAsBytes(), true);
TEST_UTIL.getHBaseAdmin().getConnection().getMaster().dispatchMergingRegions(null, request);
((ClusterConnection) TEST_UTIL.getAdmin().getConnection()).getMaster()
.dispatchMergingRegions(null, request);
} catch (ServiceException m) {
Throwable t = m.getCause();
do {
@ -1252,8 +1253,8 @@ public class TestAdmin1 {
moveRegionAndWait(regions.get(2).getFirst(), regions.get(1).getSecond());
}
try {
AdminService.BlockingInterface admin = TEST_UTIL.getHBaseAdmin().getConnection()
.getAdmin(regions.get(1).getSecond());
AdminService.BlockingInterface admin = ((ClusterConnection) TEST_UTIL.getAdmin()
.getConnection()).getAdmin(regions.get(1).getSecond());
ProtobufUtil.mergeRegions(null, admin, regions.get(1).getFirst(), regions.get(2).getFirst(),
true, null);
} catch (MergeRegionException mm) {
@ -1266,7 +1267,7 @@ public class TestAdmin1 {
throws InterruptedException, MasterNotRunningException,
ZooKeeperConnectionException, IOException {
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
TEST_UTIL.getHBaseAdmin().move(
TEST_UTIL.getAdmin().move(
destRegion.getEncodedNameAsBytes(),
Bytes.toBytes(destServer.getServerName()));
while (true) {

View File

@ -4036,7 +4036,7 @@ public class TestFromClientSide {
/**
* simple test that just executes parts of the client
* API that accept a pre-created HConnection instance
* API that accept a pre-created Connection instance
*
* @throws IOException
*/

View File

@ -121,7 +121,7 @@ public class TestFromClientSide3 {
// connection needed for poll-wait
HRegionLocation loc = locator.getRegionLocation(row, true);
AdminProtos.AdminService.BlockingInterface server =
admin.getConnection().getAdmin(loc.getServerName());
((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName());
byte[] regName = loc.getRegionInfo().getRegionName();
for (int i = 0; i < nFlushes; i++) {

View File

@ -223,7 +223,7 @@ public class TestHCM {
}
/**
* Naive test to check that HConnection#getAdmin returns a properly constructed HBaseAdmin object
* Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
* @throws IOException Unable to construct admin
*/
@Test
@ -435,7 +435,7 @@ public class TestHCM {
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
}
MasterCallable masterCallable = new MasterCallable((HConnection) TEST_UTIL.getConnection()) {
MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) {
public Object call(int timeout) throws IOException {
return null;
}
@ -452,7 +452,7 @@ public class TestHCM {
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
TEST_UTIL.createTable(tableName, FAM_NAM).close();
boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
// We want to work on a separate connection.
@ -993,7 +993,7 @@ public class TestHCM {
TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
// This should be enough to connect
HConnection conn = (HConnection) ConnectionFactory.createConnection(c);
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
assertTrue(conn.isMasterRunning());
conn.close();
}

View File

@ -79,8 +79,7 @@ public class TestReplicaWithCluster {
*/
public static class SlowMeCopro extends BaseRegionObserver {
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicReference<CountDownLatch> cdl =
new AtomicReference<CountDownLatch>(new CountDownLatch(0));
static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
public SlowMeCopro() {
}
@ -336,7 +335,7 @@ public class TestReplicaWithCluster {
// bulk load HFiles
LOG.debug("Loading test data");
@SuppressWarnings("deprecation")
final HConnection conn = HTU.getHBaseAdmin().getConnection();
final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
@Override
@ -351,7 +350,7 @@ public class TestReplicaWithCluster {
}
};
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
RpcRetryingCaller<Void> caller = factory.newCaller();
caller.callWithRetries(callable, 10000);
// verify we can read them from the primary

View File

@ -204,7 +204,7 @@ public class TestReplicasClient {
@Before
public void before() throws IOException {
HTU.getHBaseAdmin().getConnection().clearRegionCache();
((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionCache();
try {
openRegion(hriPrimary);
} catch (Exception ignored) {
@ -226,7 +226,7 @@ public class TestReplicasClient {
} catch (Exception ignored) {
}
HTU.getHBaseAdmin().getConnection().clearRegionCache();
((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionCache();
}
private HRegionServer getRS() {

View File

@ -132,7 +132,7 @@ public class TestRpcControllerFactory {
TableName name = TableName.valueOf("testcustomcontroller");
UTIL.createTable(name, fam1).close();
// change one of the connection properties so we get a new HConnection with our configuration
// change one of the connection properties so we get a new Connection with our configuration
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
Connection connection = ConnectionFactory.createConnection(conf);

View File

@ -66,7 +66,7 @@ public class TestShortCircuitConnection {
UTIL.createTable(htd, null);
HRegionServer regionServer = UTIL.getRSForFirstRegionInTable(tn);
ClusterConnection connection = regionServer.getClusterConnection();
HTableInterface tableIf = connection.getTable(tn);
Table tableIf = connection.getTable(tn);
assertTrue(tableIf instanceof HTable);
HTable table = (HTable) tableIf;
assertTrue(table.getConnection() == connection);

View File

@ -47,9 +47,9 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -285,7 +285,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
throws IOException {
int i = attmptedCalls.incrementAndGet();
if (i == 1) {
Connection errConn = null;
Connection errConn;
try {
errConn = getMockedConnection(util.getConfiguration());
} catch (Exception e) {
@ -293,10 +293,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
throw new RuntimeException("mocking cruft, should never happen");
}
failedCalls.incrementAndGet();
return super.tryAtomicRegionLoad((HConnection)errConn, tableName, first, lqis);
return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
}
return super.tryAtomicRegionLoad((HConnection)conn, tableName, first, lqis);
return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
}
};
try {
@ -316,9 +316,9 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
@SuppressWarnings("deprecation")
private HConnection getMockedConnection(final Configuration conf)
private ClusterConnection getMockedConnection(final Configuration conf)
throws IOException, ServiceException {
HConnection c = Mockito.mock(HConnection.class);
ClusterConnection c = Mockito.mock(ClusterConnection.class);
Mockito.when(c.getConfiguration()).thenReturn(conf);
Mockito.doNothing().when(c).close();
// Make it so we return a particular location when asked.

View File

@ -134,8 +134,8 @@ public class TestCatalogJanitor {
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
// Mock an HConnection and a AdminProtocol implementation. Have the
// HConnection return the HRI. Have the HRI return a few mocked up responses
// Mock an ClusterConnection and a AdminProtocol implementation. Have the
// ClusterConnection return the HRI. Have the HRI return a few mocked up responses
// to make our test work.
this.connection =
HConnectionTestingUtility.getMockedConnectionAndDecorate(this.c,

View File

@ -186,7 +186,7 @@ public class TestMasterNoCluster {
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TESTUTIL.getConfiguration());
// Insert a mock for the connection, use TESTUTIL.getConfiguration rather than
// the conf from the master; the conf will already have an HConnection
// the conf from the master; the conf will already have an ClusterConnection
// associate so the below mocking of a connection will fail.
final ClusterConnection mockedConnection = HConnectionTestingUtility.getMockedConnectionAndDecorate(
TESTUTIL.getConfiguration(), rs0, rs0, rs0.getServerName(),
@ -293,7 +293,7 @@ public class TestMasterNoCluster {
@Override
public ClusterConnection getConnection() {
// Insert a mock for the connection, use TESTUTIL.getConfiguration rather than
// the conf from the master; the conf will already have an HConnection
// the conf from the master; the conf will already have a Connection
// associate so the below mocking of a connection will fail.
try {
return HConnectionTestingUtility.getMockedConnectionAndDecorate(

View File

@ -86,8 +86,8 @@ public class TestRestartCluster {
LOG.info("\n\nStarting cluster the second time");
UTIL.restartHBaseCluster(3);
// Need to use a new 'Configuration' so we make a new HConnection.
// Otherwise we're reusing an HConnection that has gone stale because
// Need to use a new 'Configuration' so we make a new Connection.
// Otherwise we're reusing an Connection that has gone stale because
// the shutdown of the cluster also called shut of the connection.
allRegions = MetaTableAccessor.getAllRegions(UTIL.getConnection(), false);
assertEquals(4, allRegions.size());

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -195,11 +195,11 @@ public class TestHRegionServerBulkLoad {
Path hfile = new Path(dir, family(i));
byte[] fam = Bytes.toBytes(family(i));
createHFile(fs, hfile, fam, QUAL, val, 1000);
famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
famPaths.add(new Pair<>(fam, hfile.toString()));
}
// bulk load HFiles
final HConnection conn = UTIL.getHBaseAdmin().getConnection();
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override

View File

@ -25,9 +25,9 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
@ -51,7 +51,7 @@ public class TestReplicationSinkManager {
public void setUp() {
replicationPeers = mock(ReplicationPeers.class);
replicationEndpoint = mock(HBaseReplicationEndpoint.class);
sinkManager = new ReplicationSinkManager(mock(HConnection.class),
sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class),
PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
}

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -138,7 +137,7 @@ public class BaseTestHBaseFsck {
protected void undeployRegion(Connection conn, ServerName sn,
HRegionInfo hri) throws IOException, InterruptedException {
try {
HBaseFsckRepair.closeRegionSilentlyAndWait((HConnection) conn, sn, hri);
HBaseFsckRepair.closeRegionSilentlyAndWait(conn, sn, hri);
if (!hri.isMetaTable()) {
admin.offline(hri.getRegionName());
}
@ -344,11 +343,11 @@ public class BaseTestHBaseFsck {
Map<ServerName, List<String>> mm =
new HashMap<ServerName, List<String>>();
for (ServerName hsi : regionServers) {
AdminProtos.AdminService.BlockingInterface server = ((HConnection) connection).getAdmin(hsi);
AdminProtos.AdminService.BlockingInterface server = connection.getAdmin(hsi);
// list all online regions from this region server
List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(server);
List<String> regionNames = new ArrayList<String>();
List<String> regionNames = new ArrayList<>();
for (HRegionInfo hri : regions) {
regionNames.add(hri.getRegionNameAsString());
}

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.util;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@ -39,15 +41,12 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
/**
* Common base class for reader and writer parts of multi-thread HBase load
* test ({@link LoadTestTool}).
@ -57,7 +56,7 @@ public abstract class MultiThreadedAction {
protected final TableName tableName;
protected final Configuration conf;
protected final HConnection connection; // all reader / writer threads will share this connection
protected final ClusterConnection connection; // all reader / writer threads will share this connection
protected int numThreads = 1;
@ -152,7 +151,7 @@ public abstract class MultiThreadedAction {
this.dataGenerator = dataGen;
this.tableName = tableName;
this.actionLetter = actionLetter;
this.connection = (HConnection) ConnectionFactory.createConnection(conf);
this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
}
public void start(long startKey, long endKey, int numThreads) throws IOException {

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Consistency;
@ -158,7 +159,7 @@ public class MultiThreadedReader extends MultiThreadedAction
setName(getClass().getSimpleName() + "_" + readerId);
}
protected HTableInterface createTable() throws IOException {
protected Table createTable() throws IOException {
return connection.getTable(tableName);
}
@ -379,7 +380,7 @@ public class MultiThreadedReader extends MultiThreadedAction
numKeysVerified.incrementAndGet();
}
} else {
HRegionLocation hloc = connection.getRegionLocation(tableName,
HRegionLocation hloc = ((ClusterConnection) connection).getRegionLocation(tableName,
get.getRow(), false);
String rowKey = Bytes.toString(get.getRow());
LOG.info("Key = " + rowKey + ", Region location: " + hloc);

View File

@ -130,7 +130,7 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase {
table = createTable();
}
protected HTableInterface createTable() throws IOException {
protected Table createTable() throws IOException {
return connection.getTable(tableName);
}

View File

@ -87,7 +87,7 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
table = createTable();
}
protected HTableInterface createTable() throws IOException {
protected Table createTable() throws IOException {
return connection.getTable(tableName);
}

View File

@ -31,7 +31,7 @@ public class TestConnectionCache extends TestCase {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
/**
* test for ConnectionCache cleaning expired HConnection
* test for ConnectionCache cleaning expired Connection
*/
@Test
public void testConnectionChore() throws Exception {

View File

@ -19,8 +19,13 @@
package org.apache.hadoop.hbase.util;
import com.google.common.collect.Multimap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -33,7 +38,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -52,11 +56,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hbase.util.hbck.HbckTestingUtil.*;
import static org.junit.Assert.*;
@ -350,7 +349,7 @@ public class TestHBaseFsckTwoRS extends BaseTestHBaseFsck {
}
}
HBaseFsckRepair.closeRegionSilentlyAndWait((HConnection) connection,
HBaseFsckRepair.closeRegionSilentlyAndWait(connection,
cluster.getRegionServer(k).getServerName(), hbi.getHdfsHRI());
admin.offline(regionName);
break;

View File

@ -56,7 +56,7 @@ import scala.collection.mutable
*
* HBaseContext will take the responsibilities
* of disseminating the configuration information
* to the working and managing the life cycle of HConnections.
* to the working and managing the life cycle of Connections.
*/
class HBaseContext(@transient sc: SparkContext,
@transient val config: Configuration,
@ -88,14 +88,14 @@ class HBaseContext(@transient sc: SparkContext,
/**
* A simple enrichment of the traditional Spark RDD foreachPartition.
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param rdd Original RDD with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact
* the RDD values and a Connection object to interact
* with HBase
*/
def foreachPartition[T](rdd: RDD[T],
@ -107,14 +107,14 @@ class HBaseContext(@transient sc: SparkContext,
/**
* A simple enrichment of the traditional Spark Streaming dStream foreach
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a HConnection object to
* the DStream values and a Connection object to
* interact with HBase
*/
def foreachPartition[T](dstream: DStream[T],
@ -127,14 +127,14 @@ class HBaseContext(@transient sc: SparkContext,
/**
* A simple enrichment of the traditional Spark RDD mapPartition.
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param rdd Original RDD with data to iterate over
* @param mp Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact
* the RDD values and a Connection object to interact
* with HBase
* @return Returns a new RDD generated by the user definition
* function just like normal mapPartition
@ -153,9 +153,9 @@ class HBaseContext(@transient sc: SparkContext,
* foreachPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
@ -163,7 +163,7 @@ class HBaseContext(@transient sc: SparkContext,
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a HConnection object to
* the DStream values and a Connection object to
* interact with HBase
* @return Returns a new DStream generated by the user
* definition function just like normal mapPartition
@ -179,9 +179,9 @@ class HBaseContext(@transient sc: SparkContext,
* mapPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
@ -189,7 +189,7 @@ class HBaseContext(@transient sc: SparkContext,
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a HConnection object to
* the DStream values and a Connection object to
* interact with HBase
* @return Returns a new DStream generated by the user
* definition function just like normal mapPartition
@ -208,7 +208,7 @@ class HBaseContext(@transient sc: SparkContext,
*
* It allow addition support for a user to take RDD
* and generate puts and send them to HBase.
* The complexity of managing the HConnection is
* The complexity of managing the Connection is
* removed from the developer
*
* @param rdd Original RDD with data to iterate over
@ -253,7 +253,7 @@ class HBaseContext(@transient sc: SparkContext,
* It allow addition support for a user to take a DStream and
* generate puts and send them to HBase.
*
* The complexity of managing the HConnection is
* The complexity of managing the Connection is
* removed from the developer
*
* @param dstream Original DStream with data to iterate over
@ -274,7 +274,7 @@ class HBaseContext(@transient sc: SparkContext,
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take a RDD and generate delete
* and send them to HBase. The complexity of managing the HConnection is
* and send them to HBase. The complexity of managing the Connection is
* removed from the developer
*
* @param rdd Original RDD with data to iterate over
@ -294,7 +294,7 @@ class HBaseContext(@transient sc: SparkContext,
* It allow addition support for a user to take a DStream and
* generate Delete and send them to HBase.
*
* The complexity of managing the HConnection is
* The complexity of managing the Connection is
* removed from the developer
*
* @param dstream Original DStream with data to iterate over

View File

@ -43,14 +43,14 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
/**
* A simple enrichment of the traditional Spark javaRdd foreachPartition.
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param javaRdd Original javaRdd with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact
* the RDD values and a Connection object to interact
* with HBase
*/
def foreachPartition[T](javaRdd: JavaRDD[T],
@ -65,14 +65,14 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
/**
* A simple enrichment of the traditional Spark Streaming dStream foreach
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param javaDstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the JavaDStream values and a HConnection object to
* the JavaDStream values and a Connection object to
* interact with HBase
*/
def foreachPartition[T](javaDstream: JavaDStream[T],
@ -84,9 +84,9 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
/**
* A simple enrichment of the traditional Spark JavaRDD mapPartition.
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
@ -94,7 +94,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
*
* @param javaRdd Original JavaRdd with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact
* the RDD values and a Connection object to interact
* with HBase
* @return Returns a new RDD generated by the user definition
* function just like normal mapPartition
@ -118,9 +118,9 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
* mapPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
* developer access to a already connected Connection object
*
* Note: Do not close the HConnection object. All HConnection
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
@ -128,7 +128,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
*
* @param javaDstream Original JavaDStream with data to iterate over
* @param mp Function to be given a iterator to iterate through
* the JavaDStream values and a HConnection object to
* the JavaDStream values and a Connection object to
* interact with HBase
* @return Returns a new JavaDStream generated by the user
* definition function just like normal mapPartition
@ -146,7 +146,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
*
* It allow addition support for a user to take JavaRDD
* and generate puts and send them to HBase.
* The complexity of managing the HConnection is
* The complexity of managing the Connection is
* removed from the developer
*
* @param javaRdd Original JavaRDD with data to iterate over
@ -167,7 +167,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
* It allow addition support for a user to take a JavaDStream and
* generate puts and send them to HBase.
*
* The complexity of managing the HConnection is
* The complexity of managing the Connection is
* removed from the developer
*
* @param javaDstream Original DStream with data to iterate over
@ -189,7 +189,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
* It allow addition support for a user to take a JavaRDD and
* generate delete and send them to HBase.
*
* The complexity of managing the HConnection is
* The complexity of managing the Connection is
* removed from the developer
*
* @param javaRdd Original JavaRDD with data to iterate over
@ -209,7 +209,7 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext,
* It allow addition support for a user to take a JavaDStream and
* generate Delete and send them to HBase.
*
* The complexity of managing the HConnection is
* The complexity of managing the Connection is
* removed from the developer
*
* @param javaDStream Original DStream with data to iterate over