HBASE-21778 Remove the usage of the locateRegion related methods in ClusterConnection

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Duo Zhang 2019-02-01 16:40:34 +08:00 committed by zhangduo
parent 88adfa3278
commit 72d9f8747f
37 changed files with 390 additions and 508 deletions

View File

@ -136,7 +136,7 @@ class AsyncProcess {
// TODO: many of the fields should be made private
final long id;
final ClusterConnection connection;
final ConnectionImplementation connection;
private final RpcRetryingCallerFactory rpcCallerFactory;
final RpcControllerFactory rpcFactory;
@ -161,7 +161,7 @@ class AsyncProcess {
public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
private final int periodToLog;
AsyncProcess(ClusterConnection hc, Configuration conf,
AsyncProcess(ConnectionImplementation hc, Configuration conf,
RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
if (hc == null) {
throw new IllegalArgumentException("ClusterConnection cannot be null.");

View File

@ -140,7 +140,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
this.ap = ap;
}
BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
BufferedMutatorImpl(ConnectionImplementation conn, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
this(conn, params,
// puts need to track errors globally due to how the APIs currently work.

View File

@ -66,11 +66,11 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
private final Condition notFull = lock.newCondition();
public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
ConnectionImplementation connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool,
int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan);
replicaCallTimeoutMicroSecondScan);
}
@VisibleForTesting

View File

@ -68,7 +68,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Keep lastResult returned successfully in case we have to reset scanner.
protected Result lastResult = null;
protected final long maxScannerResultSize;
private final ClusterConnection connection;
private final ConnectionImplementation connection;
protected final TableName tableName;
protected final int scannerTimeout;
protected RpcRetryingCaller<Result[]> caller;
@ -93,7 +93,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
ConnectionImplementation connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
if (LOG.isTraceEnabled()) {
@ -137,7 +137,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
initCache();
}
protected ClusterConnection getConnection() {
protected ConnectionImplementation getConnection() {
return this.connection;
}

View File

@ -34,9 +34,9 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
public abstract class ClientServiceCallable<T> extends
RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {
public ClientServiceCallable(Connection connection, TableName tableName, byte [] row,
public ClientServiceCallable(Connection connection, TableName tableName, byte[] row,
RpcController rpcController, int priority) {
super(connection, tableName, row, rpcController, priority);
super((ConnectionImplementation) connection, tableName, row, rpcController, priority);
}
@Override

View File

@ -37,11 +37,11 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@InterfaceAudience.Private
public class ClientSimpleScanner extends ClientScanner {
public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
ConnectionImplementation connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool,
int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan);
replicaCallTimeoutMicroSecondScan);
}
@Override

View File

@ -18,11 +18,8 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@ -92,135 +89,6 @@ public interface ClusterConnection extends Connection {
*/
TableState getTableState(TableName tableName) throws IOException;
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation locateRegion(final TableName tableName,
final byte [] row) throws IOException;
/**
* @deprecated {@link #clearRegionLocationCache()} instead.
*/
@Deprecated
default void clearRegionCache() {
clearRegionLocationCache();
}
void cacheLocation(final TableName tableName, final RegionLocations location);
/**
* Allows flushing the region cache of all locations that pertain to
* <code>tableName</code>
* @param tableName Name of the table whose regions we are to remove from
* cache.
*/
void clearRegionCache(final TableName tableName);
/**
* Deletes cached locations for the specific region.
* @param location The location object for the region, to be purged from cache.
*/
void deleteCachedRegionLocation(final HRegionLocation location);
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in, ignoring any value that might be in the cache.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException;
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in, ignoring any value that might be in the cache.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @param replicaId the replicaId of the region
* @return RegionLocations that describe where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
RegionLocations relocateRegion(final TableName tableName,
final byte [] row, int replicaId) throws IOException;
/**
* Update the location cache. This is used internally by HBase, in most cases it should not be
* used by the client application.
* @param tableName the table name
* @param regionName the region name
* @param rowkey the row
* @param exception the exception if any. Can be null.
* @param source the previous location
*/
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey,
Object exception, ServerName source);
/**
* Gets the location of the region of <i>regionName</i>.
* @param regionName name of the region to locate
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation locateRegion(final byte[] regionName)
throws IOException;
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @return list of region locations for all regions of table
* @throws IOException if IO failure occurs
*/
List<HRegionLocation> locateRegions(final TableName tableName) throws IOException;
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @param useCache Should we use the cache to retrieve the region information.
* @param offlined True if we are to include offlined regions, false and we'll leave out offlined
* regions from returned list.
* @return list of region locations for all regions of table
* @throws IOException if IO failure occurs
*/
List<HRegionLocation> locateRegions(final TableName tableName,
final boolean useCache,
final boolean offlined) throws IOException;
/**
*
* @param tableName table to get regions of
* @param row the row
* @param useCache Should we use the cache to retrieve the region information.
* @param retry do we retry
* @return region locations for this row.
* @throws IOException if IO failure occurs
*/
RegionLocations locateRegion(TableName tableName,
byte[] row, boolean useCache, boolean retry) throws IOException;
/**
*
* @param tableName table to get regions of
* @param row the row
* @param useCache Should we use the cache to retrieve the region information.
* @param retry do we retry
* @param replicaId the replicaId for the region
* @return region locations for this row.
* @throws IOException if IO failure occurs
*/
RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry,
int replicaId) throws IOException;
/**
* Returns a {@link MasterKeepAliveConnection} to the active master
*/
@ -250,23 +118,6 @@ public interface ClusterConnection extends Connection {
*/
ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException;
/**
* Find region location hosting passed row
* @param tableName table name
* @param row Row to find.
* @param reload If true do not use cache, otherwise bypass.
* @return Location of row.
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload)
throws IOException;
/**
* Clear any caches that pertain to server name <code>sn</code>.
* @param sn A server name
*/
void clearCaches(final ServerName sn);
/**
* @return Nonce generator for this ClusterConnection; may be null if disabled in configuration.
*/

View File

@ -615,9 +615,16 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return true;
}
@Override
public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
boolean reload) throws IOException {
/**
* Find region location hosting passed row
* @param tableName table name
* @param row Row to find.
* @param reload If true do not use cache, otherwise bypass.
* @return Location of row.
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, boolean reload)
throws IOException {
return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
}
@ -687,13 +694,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
}
@Override
public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
RegionInfo.getStartKey(regionName), false, true);
return locations == null ? null : locations.getRegionLocation();
}
private boolean isDeadServer(ServerName sn) {
if (clusterStatusListener == null) {
return false;
@ -702,13 +702,26 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
}
@Override
public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @return list of region locations for all regions of table
* @throws IOException if IO failure occurs
*/
List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
return locateRegions(tableName, false, true);
}
@Override
public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @param useCache Should we use the cache to retrieve the region information.
* @param offlined True if we are to include offlined regions, false and we'll leave out offlined
* regions from returned list.
* @return list of region locations for all regions of table
* @throws IOException if IO failure occurs
*/
List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
boolean offlined) throws IOException {
List<RegionInfo> regions;
if (TableName.isMetaTableName(tableName)) {
@ -733,24 +746,44 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return locations;
}
@Override
public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
throws IOException {
/**
* Find the location of the region of <i>tableName</i> that <i>row</i> lives in.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation locateRegion(final TableName tableName, final byte[] row) throws IOException {
RegionLocations locations = locateRegion(tableName, row, true, true);
return locations == null ? null : locations.getRegionLocation();
}
@Override
public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
throws IOException {
/**
* Find the location of the region of <i>tableName</i> that <i>row</i> lives in, ignoring any
* value that might be in the cache.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation relocateRegion(final TableName tableName, final byte[] row) throws IOException {
RegionLocations locations =
relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
return locations == null ? null
: locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
@Override
public RegionLocations relocateRegion(final TableName tableName,
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in, ignoring any value that might be in the cache.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @param replicaId the replicaId of the region
* @return RegionLocations that describe where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
RegionLocations relocateRegion(final TableName tableName,
final byte [] row, int replicaId) throws IOException{
// Since this is an explicit request not to use any caching, finding
// disabled tables should not be desirable. This will ensure that an exception is thrown when
@ -762,14 +795,30 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return locateRegion(tableName, row, false, true, replicaId);
}
@Override
public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
/**
* @param tableName table to get regions of
* @param row the row
* @param useCache Should we use the cache to retrieve the region information.
* @param retry do we retry
* @return region locations for this row.
* @throws IOException if IO failure occurs
*/
RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
boolean retry) throws IOException {
return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
@Override
public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
/**
*
* @param tableName table to get regions of
* @param row the row
* @param useCache Should we use the cache to retrieve the region information.
* @param retry do we retry
* @param replicaId the replicaId for the region
* @return region locations for this row.
* @throws IOException if IO failure occurs
*/
RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
boolean retry, int replicaId) throws IOException {
checkClosed();
if (tableName == null || tableName.getName().length == 0) {
@ -973,8 +1022,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* @param tableName The table name.
* @param location the new location
*/
@Override
public void cacheLocation(final TableName tableName, final RegionLocations location) {
void cacheLocation(final TableName tableName, final RegionLocations location) {
metaCache.cacheLocation(tableName, location);
}
@ -988,15 +1036,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return metaCache.getCachedLocation(tableName, row);
}
public void clearRegionCache(final TableName tableName, byte[] row) {
void clearRegionCache(final TableName tableName, byte[] row) {
metaCache.clearCache(tableName, row);
}
/*
* Delete all cached entries of a table that maps to a specific location.
/**
* Clear any caches that pertain to server name <code>sn</code>.
* @param sn A server name
*/
@Override
public void clearCaches(final ServerName serverName) {
void clearCaches(final ServerName serverName) {
metaCache.clearCache(serverName);
}
@ -1005,8 +1053,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
metaCache.clearCache();
}
@Override
public void clearRegionCache(final TableName tableName) {
/**
* Allows flushing the region cache of all locations that pertain to <code>tableName</code>
* @param tableName Name of the table whose regions we are to remove from cache.
*/
void clearRegionCache(final TableName tableName) {
metaCache.clearCache(tableName);
}
@ -1876,8 +1927,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
cacheLocation(hri.getTable(), source, newHrl);
}
@Override
public void deleteCachedRegionLocation(final HRegionLocation location) {
void deleteCachedRegionLocation(final HRegionLocation location) {
metaCache.clearCache(location);
}
@ -1889,8 +1939,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* or wrapped or both RegionMovedException
* @param source server that is the source of the location update.
*/
@Override
public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
final Object exception, final ServerName source) {
if (rowkey == null || tableName == null) {
LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -28,6 +27,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
@ -42,7 +42,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
private final boolean writeFlushWalMarker;
private boolean reload;
public FlushRegionCallable(ClusterConnection connection,
public FlushRegionCallable(ConnectionImplementation connection,
RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] regionName,
byte[] regionStartKey, boolean writeFlushWalMarker) {
super(connection, rpcControllerFactory, tableName, regionStartKey);
@ -50,7 +50,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
this.writeFlushWalMarker = writeFlushWalMarker;
}
public FlushRegionCallable(ClusterConnection connection,
public FlushRegionCallable(ConnectionImplementation connection,
RpcControllerFactory rpcControllerFactory, RegionInfo regionInfo,
boolean writeFlushWalMarker) {
this(connection, rpcControllerFactory, regionInfo.getTable(), regionInfo.getRegionName(),

View File

@ -245,7 +245,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
public class HBaseAdmin implements Admin {
private static final Logger LOG = LoggerFactory.getLogger(HBaseAdmin.class);
private ClusterConnection connection;
private ConnectionImplementation connection;
private final Configuration conf;
private final long pause;
@ -271,7 +271,7 @@ public class HBaseAdmin implements Admin {
return syncWaitTimeout;
}
HBaseAdmin(ClusterConnection connection) throws IOException {
HBaseAdmin(ConnectionImplementation connection) throws IOException {
this.conf = connection.getConfiguration();
this.connection = connection;
@ -644,7 +644,9 @@ public class HBaseAdmin implements Admin {
protected Void postOperationResult(final Void result, final long deadlineTs)
throws IOException, TimeoutException {
// Delete cached information to prevent clients from using old locations
((ClusterConnection) getAdmin().getConnection()).clearRegionCache(getTableName());
try (RegionLocator locator = getAdmin().getConnection().getRegionLocator(getTableName())) {
locator.clearRegionLocationCache();
}
return super.postOperationResult(result, deadlineTs);
}
}

View File

@ -102,7 +102,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
public class HTable implements Table {
private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
private final ClusterConnection connection;
private final ConnectionImplementation connection;
private final TableName tableName;
private final Configuration configuration;
private final ConnectionConfiguration connConfiguration;

View File

@ -46,25 +46,25 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
protected final RpcControllerFactory rpcControllerFactory;
private HBaseRpcController controller = null;
protected final ClusterConnection connection;
protected final ConnectionImplementation connection;
protected HRegionLocation location;
protected final TableName tableName;
protected final byte[] row;
protected final int replicaId;
public RegionAdminServiceCallable(ClusterConnection connection,
public RegionAdminServiceCallable(ConnectionImplementation connection,
RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) {
this(connection, rpcControllerFactory, null, tableName, row);
}
public RegionAdminServiceCallable(ClusterConnection connection,
public RegionAdminServiceCallable(ConnectionImplementation connection,
RpcControllerFactory rpcControllerFactory, HRegionLocation location,
TableName tableName, byte[] row) {
this(connection, rpcControllerFactory, location,
tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
public RegionAdminServiceCallable(ClusterConnection connection,
public RegionAdminServiceCallable(ConnectionImplementation connection,
RpcControllerFactory rpcControllerFactory, HRegionLocation location,
TableName tableName, byte[] row, int replicaId) {
this.connection = connection;
@ -138,8 +138,8 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
return ConnectionUtils.getPauseTime(pause, tries);
}
public static RegionLocations getRegionLocations(
ClusterConnection connection, TableName tableName, byte[] row,
private static RegionLocations getRegionLocations(
ConnectionImplementation connection, TableName tableName, byte[] row,
boolean useCache, int replicaId)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.util.Bytes;
// Public but should be package private only it is used by MetaTableAccessor. FIX!!
@InterfaceAudience.Private
public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> {
private final Connection connection;
private final ConnectionImplementation connection;
private final TableName tableName;
private final byte[] row;
/**
@ -74,12 +74,12 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
* @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>.
*/
public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
public RegionServerCallable(ConnectionImplementation connection, TableName tableName, byte [] row,
RpcController rpcController) {
this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS);
}
public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
public RegionServerCallable(ConnectionImplementation connection, TableName tableName, byte [] row,
RpcController rpcController, int priority) {
super();
this.connection = connection;
@ -161,8 +161,8 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
/**
* @return {@link ClusterConnection} instance used by this Callable.
*/
protected ClusterConnection getConnection() {
return (ClusterConnection) this.connection;
protected ConnectionImplementation getConnection() {
return this.connection;
}
protected HRegionLocation getLocation() {

View File

@ -46,11 +46,11 @@ public class ReversedClientScanner extends ClientScanner {
* @throws IOException
*/
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
ConnectionImplementation connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
primaryOperationTimeout);
}
@Override

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
import java.io.IOException;
@ -29,7 +28,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@ -60,7 +58,7 @@ public class RpcRetryingCallerWithReadReplicas {
LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class);
protected final ExecutorService pool;
protected final ClusterConnection cConnection;
protected final ConnectionImplementation cConnection;
protected final Configuration conf;
protected final Get get;
protected final TableName tableName;
@ -73,7 +71,7 @@ public class RpcRetryingCallerWithReadReplicas {
public RpcRetryingCallerWithReadReplicas(
RpcControllerFactory rpcControllerFactory, TableName tableName,
ClusterConnection cConnection, final Get get,
ConnectionImplementation cConnection, final Get get,
ExecutorService pool, int retries, int operationTimeout, int rpcTimeout,
int timeBeforeReplicas) {
this.rpcControllerFactory = rpcControllerFactory;
@ -187,19 +185,14 @@ public class RpcRetryingCallerWithReadReplicas {
} else {
// We cannot get the primary replica location, it is possible that the region
// server hosting meta is down, it needs to proceed to try cached replicas.
if (cConnection instanceof ConnectionImplementation) {
rl = ((ConnectionImplementation)cConnection).getCachedLocation(tableName, get.getRow());
if (rl == null) {
// No cached locations
throw e;
}
// Primary replica location is not known, skip primary replica
skipPrimary = true;
} else {
// For completeness
rl = cConnection.getCachedLocation(tableName, get.getRow());
if (rl == null) {
// No cached locations
throw e;
}
// Primary replica location is not known, skip primary replica
skipPrimary = true;
}
}
@ -318,9 +311,8 @@ public class RpcRetryingCallerWithReadReplicas {
}
static RegionLocations getRegionLocations(boolean useCache, int replicaId,
ClusterConnection cConnection, TableName tableName, byte[] row)
ConnectionImplementation cConnection, TableName tableName, byte[] row)
throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
RegionLocations rl;
try {
if (useCache) {

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashSet;
@ -31,16 +29,17 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* This class has the logic for handling scanners for regions with and without replicas.
@ -59,7 +58,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
volatile ScannerCallable currentScannerCallable;
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
final ClusterConnection cConnection;
final ConnectionImplementation cConnection;
protected final ExecutorService pool;
protected final int timeBeforeReplicas;
private final Scan scan;
@ -73,7 +72,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private boolean someRPCcancelled = false; //required for testing purposes only
private int regionReplication = 0;
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
public ScannerCallableWithReplicas(TableName tableName, ConnectionImplementation cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result []> caller) {
@ -150,19 +149,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
RegionLocations rl = null;
try {
rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow());
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow());
} catch (RetriesExhaustedException | DoNotRetryIOException e) {
// We cannot get the primary replica region location, it is possible that the region server
// hosting meta table is down, it needs to proceed to try cached replicas directly.
if (cConnection instanceof ConnectionImplementation) {
rl = ((ConnectionImplementation) cConnection)
.getCachedLocation(tableName, currentScannerCallable.getRow());
if (rl == null) {
throw e;
}
} else {
// For completeness
rl = cConnection.getCachedLocation(tableName, currentScannerCallable.getRow());
if (rl == null) {
throw e;
}
}

View File

@ -175,17 +175,17 @@ public class TestAsyncProcess {
return r;
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
super(hc, conf,
new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
public MyAsyncProcess(ConnectionImplementation hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
service = Executors.newFixedThreadPool(5);
this.conf = conf;
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
public MyAsyncProcess(ConnectionImplementation hc, Configuration conf,
AtomicInteger nbThreads) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new CountingThreadFactory(nbThreads));
}
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
@ -326,7 +326,8 @@ public class TestAsyncProcess {
private final IOException ioe;
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
public AsyncProcessWithFailure(ConnectionImplementation hc, Configuration conf,
IOException ioe) {
super(hc, conf);
this.ioe = ioe;
serverTrackerTimeout = 1L;
@ -376,7 +377,7 @@ public class TestAsyncProcess {
customPrimarySleepMs.put(server, primaryMs);
}
public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
public MyAsyncProcessWithReplicas(ConnectionImplementation hc, Configuration conf) {
super(hc, conf);
}
@ -622,7 +623,7 @@ public class TestAsyncProcess {
}
private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
final String defaultClazz =
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(
@ -718,7 +719,7 @@ public class TestAsyncProcess {
@Test
public void testSubmit() throws Exception {
ClusterConnection hc = createHConnection();
ConnectionImplementation hc = createConnectionImpl();
MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
List<Put> puts = new ArrayList<>(1);
@ -730,7 +731,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitWithCB() throws Exception {
ClusterConnection hc = createHConnection();
ConnectionImplementation hc = createConnectionImpl();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
@Override
@ -751,7 +752,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitBusyRegion() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
final String defaultClazz =
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
@ -779,7 +780,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitBusyRegionServer() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
final String defaultClazz =
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
@ -810,7 +811,7 @@ public class TestAsyncProcess {
@Test
public void testFail() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
MyAsyncProcess ap = new MyAsyncProcess(createConnectionImpl(), CONF);
List<Put> puts = new ArrayList<>(1);
Put p = createPut(1, false);
@ -836,7 +837,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitTrue() throws IOException {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
final String defaultClazz =
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
@ -885,7 +886,7 @@ public class TestAsyncProcess {
@Test
public void testFailAndSuccess() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
MyAsyncProcess ap = new MyAsyncProcess(createConnectionImpl(), CONF);
List<Put> puts = new ArrayList<>(3);
puts.add(createPut(1, false));
@ -912,7 +913,7 @@ public class TestAsyncProcess {
@Test
public void testFlush() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
MyAsyncProcess ap = new MyAsyncProcess(createConnectionImpl(), CONF);
List<Put> puts = new ArrayList<>(3);
puts.add(createPut(1, false));
@ -929,7 +930,7 @@ public class TestAsyncProcess {
@Test
public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
ClusterConnection hc = createHConnection();
ConnectionImplementation hc = createConnectionImpl();
MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
testTaskCount(ap);
}
@ -939,7 +940,7 @@ public class TestAsyncProcess {
Configuration copyConf = new Configuration(CONF);
copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
Mockito.when(conn.getBackoffPolicy()).thenReturn(bp);
@ -979,7 +980,7 @@ public class TestAsyncProcess {
@Test
public void testMaxTask() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
final String defaultClazz =
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
@ -1038,8 +1039,8 @@ public class TestAsyncProcess {
}
}
private ClusterConnection createHConnection() throws IOException {
ClusterConnection hc = createHConnectionCommon();
private ConnectionImplementation createConnectionImpl() throws IOException {
ConnectionImplementation hc = createConnectionImplCommon();
setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
@ -1049,8 +1050,8 @@ public class TestAsyncProcess {
return hc;
}
private ClusterConnection createHConnectionWithReplicas() throws IOException {
ClusterConnection hc = createHConnectionCommon();
private ConnectionImplementation createConnectionImplWithReplicas() throws IOException {
ConnectionImplementation hc = createConnectionImplCommon();
setMockLocation(hc, DUMMY_BYTES_1, hrls1);
setMockLocation(hc, DUMMY_BYTES_2, hrls2);
setMockLocation(hc, DUMMY_BYTES_3, hrls3);
@ -1069,16 +1070,16 @@ public class TestAsyncProcess {
return hc;
}
private static void setMockLocation(ClusterConnection hc, byte[] row,
private static void setMockLocation(ConnectionImplementation hc, byte[] row,
RegionLocations result) throws IOException {
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
Mockito.anyBoolean())).thenReturn(result);
}
private ClusterConnection createHConnectionCommon() {
ClusterConnection hc = Mockito.mock(ClusterConnection.class);
private ConnectionImplementation createConnectionImplCommon() {
ConnectionImplementation hc = Mockito.mock(ConnectionImplementation.class);
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
@ -1089,7 +1090,7 @@ public class TestAsyncProcess {
@Test
public void testHTablePutSuccess() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
@ -1106,7 +1107,7 @@ public class TestAsyncProcess {
@Test
public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
checkPeriodicFlushParameters(conn, ap,
@ -1152,7 +1153,7 @@ public class TestAsyncProcess {
@Test
public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
@ -1219,7 +1220,7 @@ public class TestAsyncProcess {
@Test
public void testBufferedMutatorImplWithSharedPool() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
@ -1230,7 +1231,7 @@ public class TestAsyncProcess {
@Test
public void testFailedPutAndNewPut() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
.writeBufferSize(0);
@ -1275,7 +1276,7 @@ public class TestAsyncProcess {
@Test
public void testBatch() throws IOException, InterruptedException {
ClusterConnection conn = new MyConnectionImpl(CONF);
ConnectionImplementation conn = new MyConnectionImpl(CONF);
HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
ht.multiAp = new MyAsyncProcess(conn, CONF);
@ -1306,7 +1307,7 @@ public class TestAsyncProcess {
@Test
public void testErrorsServers() throws IOException {
Configuration configuration = new Configuration(CONF);
ClusterConnection conn = new MyConnectionImpl(configuration);
ConnectionImplementation conn = new MyConnectionImpl(configuration);
MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
@ -1337,7 +1338,7 @@ public class TestAsyncProcess {
Configuration copyConf = new Configuration(CONF);
copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
ClusterConnection conn = new MyConnectionImpl(copyConf);
ConnectionImplementation conn = new MyConnectionImpl(copyConf);
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
ht.multiAp = ap;
@ -1370,7 +1371,7 @@ public class TestAsyncProcess {
@Test
public void testErrors() throws IOException {
ClusterConnection conn = new MyConnectionImpl(CONF);
ConnectionImplementation conn = new MyConnectionImpl(CONF);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
@ -1394,7 +1395,7 @@ public class TestAsyncProcess {
@Test
public void testCallQueueTooLarge() throws IOException {
ClusterConnection conn = new MyConnectionImpl(CONF);
ConnectionImplementation conn = new MyConnectionImpl(CONF);
AsyncProcessWithFailure ap =
new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
@ -1609,7 +1610,7 @@ public class TestAsyncProcess {
// TODO: this is kind of timing dependent... perhaps it should detect from createCaller
// that the replica call has happened and that way control the ordering.
Configuration conf = new Configuration();
ClusterConnection conn = createHConnectionWithReplicas();
ConnectionImplementation conn = createConnectionImplWithReplicas();
conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
if (retries >= 0) {
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
@ -1707,16 +1708,15 @@ public class TestAsyncProcess {
}
static class AsyncProcessForThrowableCheck extends AsyncProcess {
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(
conf));
public AsyncProcessForThrowableCheck(ConnectionImplementation hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
}
}
@Test
public void testUncheckedException() throws Exception {
// Test the case pool.submit throws unchecked exception
ClusterConnection hc = createHConnection();
ConnectionImplementation hc = createConnectionImpl();
MyThreadPoolExecutor myPool =
new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200));
@ -1748,7 +1748,7 @@ public class TestAsyncProcess {
final int retries = 1;
myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
ClusterConnection conn = new MyConnectionImpl(myConf);
ConnectionImplementation conn = new MyConnectionImpl(myConf);
AsyncProcessWithFailure ap =
new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
@ -1807,7 +1807,7 @@ public class TestAsyncProcess {
@Test
public void testRetryWithExceptionClearsMetaCache() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
Configuration myConf = conn.getConfiguration();
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
@ -1840,7 +1840,7 @@ public class TestAsyncProcess {
@Test
public void testQueueRowAccess() throws Exception {
ClusterConnection conn = createHConnection();
ConnectionImplementation conn = createConnectionImpl();
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);

View File

@ -175,8 +175,8 @@ public class TestAsyncProcessWithRegionException {
}
}
private static ClusterConnection createHConnection() throws IOException {
ClusterConnection hc = Mockito.mock(ClusterConnection.class);
private static ConnectionImplementation createHConnection() throws IOException {
ConnectionImplementation hc = Mockito.mock(ConnectionImplementation.class);
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
@ -190,8 +190,8 @@ public class TestAsyncProcessWithRegionException {
return hc;
}
private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
throws IOException {
private static void setMockLocation(ConnectionImplementation hc, byte[] row,
RegionLocations result) throws IOException {
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
@ -201,7 +201,7 @@ public class TestAsyncProcessWithRegionException {
private static class MyAsyncProcess extends AsyncProcess {
private final ExecutorService service = Executors.newFixedThreadPool(5);
MyAsyncProcess(ClusterConnection hc, Configuration conf) {
MyAsyncProcess(ConnectionImplementation hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
}

View File

@ -48,7 +48,7 @@ public class TestBufferedMutator {
* Just to prove that I can insert a BM other than default.
*/
public static class MyBufferedMutator extends BufferedMutatorImpl {
MyBufferedMutator(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
MyBufferedMutator(ConnectionImplementation conn, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
super(conn, rpcCallerFactory, rpcFactory, params);
}

View File

@ -71,7 +71,7 @@ public class TestClientScanner {
ExecutorService pool;
Configuration conf;
ClusterConnection clusterConn;
ConnectionImplementation clusterConn;
RpcRetryingCallerFactory rpcFactory;
RpcControllerFactory controllerFactory;
@ -80,7 +80,7 @@ public class TestClientScanner {
@Before
public void setup() throws IOException {
clusterConn = Mockito.mock(ClusterConnection.class);
clusterConn = Mockito.mock(ConnectionImplementation.class);
rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
controllerFactory = Mockito.mock(RpcControllerFactory.class);
pool = Executors.newSingleThreadExecutor();
@ -103,11 +103,11 @@ public class TestClientScanner {
private boolean initialized = false;
public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
ConnectionImplementation connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
primaryOperationTimeout);
}
@Override

View File

@ -45,7 +45,7 @@ public class TestReversedScannerCallable {
HBaseClassTestRule.forClass(TestReversedScannerCallable.class);
@Mock
private ClusterConnection connection;
private ConnectionImplementation connection;
@Mock
private Scan scan;
@Mock

View File

@ -31,13 +31,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.IntegrationTestIngest;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
@ -52,6 +51,7 @@ import org.junit.Assert;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@ -350,10 +350,11 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr
numReadFailures.addAndGet(1); // fail the test
for (Result r : results) {
LOG.error("FAILED FOR " + r);
RegionLocations rl = ((ClusterConnection)connection).
locateRegion(tableName, r.getRow(), true, true);
HRegionLocation locations[] = rl.getRegionLocations();
for (HRegionLocation h : locations) {
List<HRegionLocation> locs;
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
locs = locator.getRegionLocations(r.getRow());
}
for (HRegionLocation h : locs) {
LOG.error("LOCATION " + h);
}
}

View File

@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -1948,33 +1949,31 @@ public class HBaseFsck extends Configured implements Closeable {
* Record the location of the hbase:meta region as found in ZooKeeper.
*/
private boolean recordMetaRegion() throws IOException {
RegionLocations rl = connection.locateRegion(TableName.META_TABLE_NAME,
HConstants.EMPTY_START_ROW, false, false);
if (rl == null) {
errors.reportError(ERROR_CODE.NULL_META_REGION,
"META region was not found in ZooKeeper");
List<HRegionLocation> locs;
try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
locs = locator.getRegionLocations(HConstants.EMPTY_START_ROW, true);
}
if (locs == null || locs.isEmpty()) {
errors.reportError(ERROR_CODE.NULL_META_REGION, "META region was not found in ZooKeeper");
return false;
}
for (HRegionLocation metaLocation : rl.getRegionLocations()) {
for (HRegionLocation metaLocation : locs) {
// Check if Meta region is valid and existing
if (metaLocation == null ) {
errors.reportError(ERROR_CODE.NULL_META_REGION,
"META region location is null");
if (metaLocation == null) {
errors.reportError(ERROR_CODE.NULL_META_REGION, "META region location is null");
return false;
}
if (metaLocation.getRegion() == null) {
errors.reportError(ERROR_CODE.NULL_META_REGION,
"META location regionInfo is null");
errors.reportError(ERROR_CODE.NULL_META_REGION, "META location regionInfo is null");
return false;
}
if (metaLocation.getHostname() == null) {
errors.reportError(ERROR_CODE.NULL_META_REGION,
"META location hostName is null");
errors.reportError(ERROR_CODE.NULL_META_REGION, "META location hostName is null");
return false;
}
ServerName sn = metaLocation.getServerName();
MetaEntry m = new MetaEntry(metaLocation.getRegion(), sn,
EnvironmentEdgeManager.currentTime());
MetaEntry m =
new MetaEntry(metaLocation.getRegion(), sn, EnvironmentEdgeManager.currentTime());
HbckInfo hbckInfo = regionInfoMap.get(metaLocation.getRegion().getEncodedName());
if (hbckInfo == null) {
regionInfoMap.put(metaLocation.getRegion().getEncodedName(), new HbckInfo(m));

View File

@ -44,23 +44,22 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.NoServerForRegionException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@ -708,7 +707,7 @@ public class RegionSplitter {
Path tableDir = tableDirAndSplitFile.getFirst();
FileSystem fs = tableDir.getFileSystem(connection.getConfiguration());
// Clear the cache to forcibly refresh region information
((ClusterConnection)connection).clearRegionLocationCache();
connection.clearRegionLocationCache();
TableDescriptor htd = null;
try (Table table = connection.getTable(tableName)) {
htd = table.getDescriptor();
@ -769,7 +768,7 @@ public class RegionSplitter {
} catch (NoServerForRegionException nsfre) {
LOG.debug("No Server Exception thrown for: " + splitAlgo.rowToStr(start));
physicalSplitting.add(region);
((ClusterConnection)connection).clearRegionLocationCache();
connection.clearRegionLocationCache();
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
@ -39,25 +38,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
* {@link ClusterConnection} testing utility.
* {@link ConnectionImplementation} testing utility.
*/
public class HConnectionTestingUtility {
/*
* Not part of {@link HBaseTestingUtility} because this class is not
* in same package as {@link ClusterConnection}. Would have to reveal ugly
* {@link ConnectionImplementation} innards to HBaseTestingUtility to give it access.
* Not part of {@link HBaseTestingUtility} because this class is not in same package as {@link
* ConnectionImplementation}. Would have to reveal ugly {@link ConnectionImplementation} innards
* to HBaseTestingUtility to give it access.
*/
/**
* Get a Mocked {@link ClusterConnection} that goes with the passed <code>conf</code>
* configuration instance. Minimally the mock will return
* &lt;code>conf&lt;/conf> when {@link ClusterConnection#getConfiguration()} is invoked.
* Be sure to shutdown the connection when done by calling
* {@link Connection#close()} else it will stick around; this is probably not what you want.
* Get a Mocked {@link ConnectionImplementation} that goes with the passed <code>conf</code>
* configuration instance. Minimally the mock will return &lt;code>conf&lt;/conf> when
* {@link ConnectionImplementation#getConfiguration()} is invoked. Be sure to shutdown the
* connection when done by calling {@link Connection#close()} else it will stick around; this is
* probably not what you want.
* @param conf configuration
* @return ClusterConnection object for <code>conf</code>
* @return ConnectionImplementation object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static ClusterConnection getMockedConnection(final Configuration conf)
public static ConnectionImplementation getMockedConnection(final Configuration conf)
throws ZooKeeperConnectionException {
ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
Mockito.when(connection.getConfiguration()).thenReturn(conf);
@ -70,37 +69,30 @@ public class HConnectionTestingUtility {
}
/**
* Calls {@link #getMockedConnection(Configuration)} and then mocks a few
* more of the popular {@link ClusterConnection} methods so they do 'normal'
* operation (see return doc below for list). Be sure to shutdown the
* connection when done by calling {@link Connection#close()} else it will stick around;
* this is probably not what you want.
*
* Calls {@link #getMockedConnection(Configuration)} and then mocks a few more of the popular
* {@link ConnectionImplementation} methods so they do 'normal' operation (see return doc below
* for list). Be sure to shutdown the connection when done by calling {@link Connection#close()}
* else it will stick around; this is probably not what you want.
* @param conf Configuration to use
* @param admin An AdminProtocol; can be null but is usually
* itself a mock.
* @param client A ClientProtocol; can be null but is usually
* itself a mock.
* @param sn ServerName to include in the region location returned by this
* <code>connection</code>
* @param hri RegionInfo to include in the location returned when
* getRegionLocator is called on the mocked connection
* @param admin An AdminProtocol; can be null but is usually itself a mock.
* @param client A ClientProtocol; can be null but is usually itself a mock.
* @param sn ServerName to include in the region location returned by this <code>connection</code>
* @param hri RegionInfo to include in the location returned when getRegionLocator is called on
* the mocked connection
* @return Mock up a connection that returns a {@link Configuration} when
* {@link ClusterConnection#getConfiguration()} is called, a 'location' when
* {@link ClusterConnection#getRegionLocation(org.apache.hadoop.hbase.TableName, byte[], boolean)}
* is called,
* and that returns the passed {@link AdminProtos.AdminService.BlockingInterface} instance when
* {@link ClusterConnection#getAdmin(ServerName)} is called, returns the passed
* {@link ClientProtos.ClientService.BlockingInterface} instance when
* {@link ClusterConnection#getClient(ServerName)} is called (Be sure to call
* {@link Connection#close()} when done with this mocked Connection.
* @throws IOException
* {@link ConnectionImplementation#getConfiguration()} is called, a 'location' when
* {@link ConnectionImplementation#getRegionLocation(TableName,byte[], boolean)}
* is called, and that returns the passed
* {@link AdminProtos.AdminService.BlockingInterface} instance when
* {@link ConnectionImplementation#getAdmin(ServerName)} is called, returns the passed
* {@link ClientProtos.ClientService.BlockingInterface} instance when
* {@link ConnectionImplementation#getClient(ServerName)} is called (Be sure to call
* {@link Connection#close()} when done with this mocked Connection.
*/
public static ClusterConnection getMockedConnectionAndDecorate(final Configuration conf,
public static ConnectionImplementation getMockedConnectionAndDecorate(final Configuration conf,
final AdminProtos.AdminService.BlockingInterface admin,
final ClientProtos.ClientService.BlockingInterface client,
final ServerName sn, final RegionInfo hri)
throws IOException {
final ClientProtos.ClientService.BlockingInterface client, final ServerName sn,
final RegionInfo hri) throws IOException {
ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class);
Mockito.when(c.getConfiguration()).thenReturn(conf);
Mockito.doNothing().when(c).close();
@ -141,18 +133,17 @@ public class HConnectionTestingUtility {
}
/**
* Get a Mockito spied-upon {@link ClusterConnection} that goes with the passed
* <code>conf</code> configuration instance.
* Be sure to shutdown the connection when done by calling
* {@link Connection#close()} else it will stick around; this is probably not what you want.
* Get a Mockito spied-upon {@link ConnectionImplementation} that goes with the passed
* <code>conf</code> configuration instance. Be sure to shutdown the connection when done by
* calling {@link Connection#close()} else it will stick around; this is probably not what you
* want.
* @param conf configuration
* @return ClusterConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
* [Dead link]: See also
* {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}
* @return ConnectionImplementation object for <code>conf</code>
* @throws ZooKeeperConnectionException [Dead link]: See also
* {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}
*/
public static ClusterConnection getSpiedConnection(final Configuration conf)
throws IOException {
public static ConnectionImplementation getSpiedConnection(final Configuration conf)
throws IOException {
ConnectionImplementation connection =
Mockito.spy(new ConnectionImplementation(conf, null, null));
return connection;

View File

@ -111,8 +111,8 @@ public class TestCISleep extends AbstractTestCITimeout {
}
RegionAdminServiceCallable<Object> regionAdminServiceCallable =
new RegionAdminServiceCallable<Object>((ClusterConnection) TEST_UTIL.getConnection(),
new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, FAM_NAM) {
new RegionAdminServiceCallable<Object>((ConnectionImplementation) TEST_UTIL.getConnection(),
new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, FAM_NAM) {
@Override
public Object call(HBaseRpcController controller) throws Exception {
return null;

View File

@ -95,8 +95,9 @@ public class TestHBaseAdminNoCluster {
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count);
// Get mocked connection. Getting the connection will register it so when HBaseAdmin is
// constructed with same configuration, it will find this mocked connection.
ClusterConnection connection = HConnectionTestingUtility.getMockedConnection(configuration);
// Mock so we get back the master interface. Make it so when createTable is called, we throw
ConnectionImplementation connection =
HConnectionTestingUtility.getMockedConnection(configuration);
// Mock so we get back the master interface. Make it so when createTable is called, we throw
// the PleaseHoldException.
MasterKeepAliveConnection masterAdmin = Mockito.mock(MasterKeepAliveConnection.class);
Mockito.when(masterAdmin.createTable((RpcController)Mockito.any(),
@ -292,7 +293,7 @@ public class TestHBaseAdminNoCluster {
final int count = 10;
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, count);
ClusterConnection connection = mock(ClusterConnection.class);
ConnectionImplementation connection = mock(ConnectionImplementation.class);
when(connection.getConfiguration()).thenReturn(configuration);
MasterKeepAliveConnection masterAdmin =
Mockito.mock(MasterKeepAliveConnection.class, new Answer() {

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -26,11 +26,19 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
@ -112,8 +120,8 @@ public class TestMetaTableAccessorNoCluster {
assertTrue(hri == null);
// OK, give it what it expects
kvs.clear();
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f,
HConstants.REGIONINFO_QUALIFIER, RegionInfo.toByteArray(RegionInfoBuilder.FIRST_META_REGIONINFO)));
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f, HConstants.REGIONINFO_QUALIFIER,
RegionInfo.toByteArray(RegionInfoBuilder.FIRST_META_REGIONINFO)));
hri = MetaTableAccessor.getRegionInfo(Result.create(kvs));
assertNotNull(hri);
assertTrue(RegionInfo.COMPARATOR.compare(hri, RegionInfoBuilder.FIRST_META_REGIONINFO) == 0);
@ -123,8 +131,6 @@ public class TestMetaTableAccessorNoCluster {
* Test that MetaTableAccessor will ride over server throwing
* "Server not running" IOEs.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-3446">HBASE-3446</a>
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testRideOverServerNotRunning()
@ -135,7 +141,7 @@ public class TestMetaTableAccessorNoCluster {
// This is a servername we use in a few places below.
ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis());
ClusterConnection connection = null;
ConnectionImplementation connection = null;
try {
// Mock an ClientProtocol. Our mock implementation will fail a few
// times when we go to open a scanner.
@ -190,26 +196,27 @@ public class TestMetaTableAccessorNoCluster {
// Return the RegionLocations object when locateRegion
// The ugly format below comes of 'Important gotcha on spying real objects!' from
// http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html
Mockito.doReturn(rl).when
(connection).locateRegion((TableName)Mockito.any(), (byte[])Mockito.any(),
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt());
Mockito.doReturn(rl).when(connection).locateRegion((TableName) Mockito.any(),
(byte[]) Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt());
// Now shove our HRI implementation into the spied-upon connection.
Mockito.doReturn(implementation).
when(connection).getClient(Mockito.any());
Mockito.doReturn(implementation).when(connection).getClient(Mockito.any());
// Scan meta for user tables and verify we got back expected answer.
NavigableMap<RegionInfo, Result> hris =
MetaTableAccessor.getServerUserRegions(connection, sn);
assertEquals(1, hris.size());
assertTrue(RegionInfo.COMPARATOR.compare(hris.firstEntry().getKey(), RegionInfoBuilder.FIRST_META_REGIONINFO) == 0);
assertTrue(RegionInfo.COMPARATOR.compare(hris.firstEntry().getKey(),
RegionInfoBuilder.FIRST_META_REGIONINFO) == 0);
assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow()));
// Finally verify that scan was called four times -- three times
// with exception and then on 4th attempt we succeed
Mockito.verify(implementation, Mockito.times(4)).
scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any());
Mockito.verify(implementation, Mockito.times(4)).scan((RpcController) Mockito.any(),
(ScanRequest) Mockito.any());
} finally {
if (connection != null && !connection.isClosed()) connection.close();
if (connection != null && !connection.isClosed()) {
connection.close();
}
zkw.close();
}
}

View File

@ -671,7 +671,7 @@ public class TestReplicaWithCluster {
public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException {
HTU.getAdmin().balancerSwitch(false, true);
((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
((ConnectionImplementation) HTU.getConnection()).setUseMetaReplicas(true);
// Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor("testGetRegionLocationFromPrimaryMetaRegion");
@ -683,12 +683,12 @@ public class TestReplicaWithCluster {
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
// Get user table location, always get it from the primary meta replica
RegionLocations url = ((ClusterConnection) HTU.getConnection())
.locateRegion(hdt.getTableName(), row, false, false);
try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
locator.getRegionLocations(row, true);
}
} finally {
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
((ConnectionImplementation) HTU.getConnection()).setUseMetaReplicas(false);
HTU.getAdmin().balancerSwitch(true, true);
HTU.getAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
@ -704,23 +704,25 @@ public class TestReplicaWithCluster {
public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
HTU.getAdmin().balancerSwitch(false, true);
((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
((ConnectionImplementation)HTU.getConnection()).setUseMetaReplicas(true);
// Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
hdt.setRegionReplication(2);
try {
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
// Get Meta location
RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
.locateRegion(TableName.META_TABLE_NAME,
HConstants.EMPTY_START_ROW, false, false);
RegionLocations mrl;
try (
RegionLocator locator = HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
mrl = new RegionLocations(locator.getRegionLocations(HConstants.EMPTY_START_ROW, true));
}
// Get user table location
RegionLocations url = ((ClusterConnection) HTU.getConnection())
.locateRegion(hdt.getTableName(), row, false, false);
RegionLocations url;
try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
url = new RegionLocations(locator.getRegionLocations(row, true));
}
// Make sure that user primary region is co-hosted with the meta region
if (!url.getDefaultRegionLocation().getServerName().equals(
@ -739,12 +741,15 @@ public class TestReplicaWithCluster {
// Wait until the meta table is updated with new location info
while (true) {
mrl = ((ClusterConnection) HTU.getConnection())
.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
try (RegionLocator locator =
HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
mrl = new RegionLocations(locator.getRegionLocations(HConstants.EMPTY_START_ROW, true));
}
// Get user table location
url = ((ClusterConnection) HTU.getConnection())
.locateRegion(hdt.getTableName(), row, false, true);
try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
url = new RegionLocations(locator.getRegionLocations(row, true));
}
LOG.info("meta locations " + mrl);
LOG.info("table locations " + url);
@ -786,7 +791,7 @@ public class TestReplicaWithCluster {
Assert.assertTrue(r.isStale());
} finally {
((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
((ConnectionImplementation)HTU.getConnection()).setUseMetaReplicas(false);
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
HTU.getAdmin().balancerSwitch(true, true);
HTU.getAdmin().disableTable(hdt.getTableName());

View File

@ -229,7 +229,7 @@ public class TestReplicasClient {
@Before
public void before() throws IOException {
((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionLocationCache();
HTU.getConnection().clearRegionLocationCache();
try {
openRegion(hriPrimary);
} catch (Exception ignored) {
@ -250,8 +250,7 @@ public class TestReplicasClient {
closeRegion(hriPrimary);
} catch (Exception ignored) {
}
((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionLocationCache();
HTU.getConnection().clearRegionLocationCache();
}
private HRegionServer getRS() {
@ -329,7 +328,7 @@ public class TestReplicasClient {
public void testLocations() throws Exception {
byte[] b1 = Bytes.toBytes("testLocations");
openRegion(hriSecondary);
ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection();
ConnectionImplementation hc = (ConnectionImplementation) HTU.getConnection();
try {
hc.clearRegionLocationCache();

View File

@ -204,7 +204,7 @@ public class TestSeparateClientZKCluster {
public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
TableName tn = TableName.valueOf(name.getMethodName());
// create table
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
Connection conn = TEST_UTIL.getConnection();
Admin admin = conn.getAdmin();
HTable table = (HTable) conn.getTable(tn);
try {

View File

@ -25,8 +25,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -58,32 +59,33 @@ public class TestMetaAssignmentWithStopMaster {
@Test
public void testStopActiveMaster() throws Exception {
ClusterConnection conn =
(ClusterConnection) ConnectionFactory.createConnection(UTIL.getConfiguration());
ServerName oldMetaServer = conn.locateRegions(TableName.META_TABLE_NAME).get(0).getServerName();
ServerName oldMaster = UTIL.getMiniHBaseCluster().getMaster().getServerName();
try (Connection conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
RegionLocator locator = conn.getRegionLocator(TableName.META_TABLE_NAME)) {
ServerName oldMetaServer = locator.getAllRegionLocations().get(0).getServerName();
ServerName oldMaster = UTIL.getMiniHBaseCluster().getMaster().getServerName();
UTIL.getMiniHBaseCluster().getMaster().stop("Stop master for test");
long startTime = System.currentTimeMillis();
while (UTIL.getMiniHBaseCluster().getMaster() == null || UTIL.getMiniHBaseCluster().getMaster()
.getServerName().equals(oldMaster)) {
LOG.info("Wait the standby master become active");
Thread.sleep(3000);
if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) {
fail("Wait too long for standby master become active");
UTIL.getMiniHBaseCluster().getMaster().stop("Stop master for test");
long startTime = System.currentTimeMillis();
while (UTIL.getMiniHBaseCluster().getMaster() == null ||
UTIL.getMiniHBaseCluster().getMaster().getServerName().equals(oldMaster)) {
LOG.info("Wait the standby master become active");
Thread.sleep(3000);
if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) {
fail("Wait too long for standby master become active");
}
}
}
startTime = System.currentTimeMillis();
while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {
LOG.info("Wait the new active master to be initialized");
Thread.sleep(3000);
if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) {
fail("Wait too long for the new active master to be initialized");
startTime = System.currentTimeMillis();
while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {
LOG.info("Wait the new active master to be initialized");
Thread.sleep(3000);
if (System.currentTimeMillis() - startTime > WAIT_TIMEOUT) {
fail("Wait too long for the new active master to be initialized");
}
}
}
ServerName newMetaServer = conn.locateRegions(TableName.META_TABLE_NAME).get(0).getServerName();
assertTrue("The new meta server " + newMetaServer + " should be same with" +
ServerName newMetaServer = locator.getAllRegionLocations().get(0).getServerName();
assertTrue("The new meta server " + newMetaServer + " should be same with" +
" the old meta server " + oldMetaServer, newMetaServer.equals(oldMetaServer));
}
}
}

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
@ -50,6 +49,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -366,24 +366,15 @@ public class TestLoadIncrementalHFilesSplitRecovery {
private ClusterConnection getMockedConnection(final Configuration conf)
throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
ClusterConnection c = Mockito.mock(ClusterConnection.class);
Mockito.when(c.getConfiguration()).thenReturn(conf);
Mockito.doNothing().when(c).close();
// Make it so we return a particular location when asked.
final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO,
ServerName.valueOf("example.org", 1234, 0));
Mockito.when(
c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
.thenReturn(loc);
Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
ClientProtos.ClientService.BlockingInterface hri =
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
ServerName sn = ServerName.valueOf("example.org", 1234, 0);
RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO;
ClientProtos.ClientService.BlockingInterface client =
Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
Mockito
.when(
hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
.thenThrow(new ServiceException(new IOException("injecting bulk load error")));
Mockito.when(c.getClient(Mockito.any())).thenReturn(hri);
return c;
.when(
client.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
.thenThrow(new ServiceException(new IOException("injecting bulk load error")));
return HConnectionTestingUtility.getMockedConnectionAndDecorate(conf, null, client, sn, hri);
}
/**

View File

@ -307,8 +307,7 @@ public class BaseTestHBaseFsck {
tbl.close();
tbl = null;
}
((ClusterConnection) connection).clearRegionLocationCache();
connection.clearRegionLocationCache();
deleteTable(TEST_UTIL, tablename);
}

View File

@ -19,33 +19,34 @@ package org.apache.hadoop.hbase.util;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
/**
* Common base class for reader and writer parts of multi-thread HBase load
* test (See LoadTestTool).
@ -491,7 +492,6 @@ public abstract class MultiThreadedAction {
}
private void printLocations(Result r) {
RegionLocations rl = null;
if (r == null) {
LOG.info("FAILED FOR null Result");
return;
@ -500,15 +500,14 @@ public abstract class MultiThreadedAction {
if (r.getRow() == null) {
return;
}
try {
rl = ((ClusterConnection)connection).locateRegion(tableName, r.getRow(), true, true);
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
List<HRegionLocation> locs = locator.getRegionLocations(r.getRow());
for (HRegionLocation h : locs) {
LOG.info("LOCATION " + h);
}
} catch (IOException e) {
LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
}
HRegionLocation locations[] = rl.getRegionLocations();
for (HRegionLocation h : locations) {
LOG.info("LOCATION " + h);
}
}
private String resultToString(Result result) {

View File

@ -21,14 +21,13 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
@ -377,8 +376,10 @@ public class MultiThreadedReader extends MultiThreadedAction
numKeysVerified.incrementAndGet();
}
} else {
HRegionLocation hloc = connection.getRegionLocation(tableName,
get.getRow(), false);
HRegionLocation hloc;
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
hloc = locator.getRegionLocation(get.getRow());
}
String rowKey = Bytes.toString(get.getRow());
LOG.info("Key = " + rowKey + ", Region location: " + hloc);
if(isNullExpected) {

View File

@ -27,10 +27,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.slf4j.Logger;
@ -97,9 +97,9 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
HRegionLocation cached = null, real = null;
try {
cached = connection.getRegionLocation(tableName, rowKey, false);
real = connection.getRegionLocation(tableName, rowKey, true);
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
cached = locator.getRegionLocation(rowKey, false);
real = locator.getRegionLocation(rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}