HBASE-21585 Remove ClusterConnection

This commit is contained in:
zhangduo 2019-02-11 20:32:21 +08:00
parent 2182bfb942
commit b7793d7d1d
81 changed files with 361 additions and 753 deletions

View File

@ -63,7 +63,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BufferedMutatorImpl implements BufferedMutator {
class BufferedMutatorImpl implements BufferedMutator {
private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorImpl.class);
@ -95,7 +95,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
private final AsyncProcess ap;
@VisibleForTesting
BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) {
BufferedMutatorImpl(ConnectionImplementation conn, BufferedMutatorParams params,
AsyncProcess ap) {
if (conn == null || conn.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
}

View File

@ -39,8 +39,10 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
Cancellable {
private final RetryingTimeTracker tracker;
private final int rpcTimeout;
CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority) {
CancellableRegionServerCallable(ConnectionImplementation connection, TableName tableName,
byte[] row, RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker,
int priority) {
super(connection, tableName, row, rpcController, priority);
this.rpcTimeout = rpcTimeout;
this.tracker = tracker;

View File

@ -31,12 +31,12 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
* @param <T>
*/
@InterfaceAudience.Private
public abstract class ClientServiceCallable<T> extends
RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {
public abstract class ClientServiceCallable<T>
extends RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {
public ClientServiceCallable(Connection connection, TableName tableName, byte[] row,
public ClientServiceCallable(ConnectionImplementation connection, TableName tableName, byte[] row,
RpcController rpcController, int priority) {
super((ConnectionImplementation) connection, tableName, row, rpcController, priority);
super(connection, tableName, row, rpcController, priority);
}
@Override
@ -46,12 +46,12 @@ public abstract class ClientServiceCallable<T> extends
// Below here are simple methods that contain the stub and the rpcController.
protected ClientProtos.GetResponse doGet(ClientProtos.GetRequest request)
throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
return getStub().get(getRpcController(), request);
}
protected ClientProtos.MutateResponse doMutate(ClientProtos.MutateRequest request)
throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
return getStub().mutate(getRpcController(), request);
}
}

View File

@ -1,181 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
/** Internal methods on Connection that should not be used by user code. */
@InterfaceAudience.Private
// NOTE: Although this class is public, this class is meant to be used directly from internal
// classes and unit tests only.
public interface ClusterConnection extends Connection {
/**
* Key for configuration in Configuration whose value is the class we implement making a
* new Connection instance.
*/
String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl";
/**
* @return - true if the master server is running
* @deprecated this has been deprecated without a replacement
*/
@Deprecated
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
/**
* Use this api to check if the table has been created with the specified number of
* splitkeys which was used while creating the given table.
* Note : If this api is used after a table's region gets splitted, the api may return
* false.
* @param tableName
* tableName
* @param splitKeys
* splitKeys used while creating table
* @throws IOException
* if a remote or network exception occurs
*/
boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws
IOException;
/**
* A table that isTableEnabled == false and isTableDisabled == false
* is possible. This happens when a table has a lot of regions
* that must be processed.
* @param tableName table name
* @return true if the table is enabled, false otherwise
* @throws IOException if a remote or network exception occurs
*/
boolean isTableEnabled(TableName tableName) throws IOException;
/**
* @param tableName table name
* @return true if the table is disabled, false otherwise
* @throws IOException if a remote or network exception occurs
*/
boolean isTableDisabled(TableName tableName) throws IOException;
/**
* Retrieve TableState, represent current table state.
* @param tableName table state for
* @return state of the table
*/
TableState getTableState(TableName tableName) throws IOException;
/**
* Returns a {@link MasterKeepAliveConnection} to the active master
*/
MasterKeepAliveConnection getMaster() throws IOException;
/**
* Get the admin service for master.
*/
AdminService.BlockingInterface getAdminForMaster() throws IOException;
/**
* Establishes a connection to the region server at the specified address.
* @param serverName the region server to connect to
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*/
AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException;
/**
* Establishes a connection to the region server at the specified address, and returns
* a region client protocol.
*
* @param serverName the region server to connect to
* @return ClientProtocol proxy for RegionServer
* @throws IOException if a remote or network exception occurs
*
*/
ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException;
/**
* @return Nonce generator for this ClusterConnection; may be null if disabled in configuration.
*/
NonceGenerator getNonceGenerator();
/**
* @return Default AsyncProcess associated with this connection.
*/
AsyncProcess getAsyncProcess();
/**
* Returns a new RpcRetryingCallerFactory from the given {@link Configuration}.
* This RpcRetryingCallerFactory lets the users create {@link RpcRetryingCaller}s which can be
* intercepted with the configured {@link RetryingCallerInterceptor}
* @param conf configuration
* @return RpcRetryingCallerFactory
*/
RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf);
/**
* @return Connection's RpcRetryingCallerFactory instance
*/
RpcRetryingCallerFactory getRpcRetryingCallerFactory();
/**
* @return Connection's RpcControllerFactory instance
*/
RpcControllerFactory getRpcControllerFactory();
/**
* @return a ConnectionConfiguration object holding parsed configuration values
*/
ConnectionConfiguration getConnectionConfiguration();
/**
* @return the current statistics tracker associated with this connection
*/
ServerStatisticTracker getStatisticsTracker();
/**
* @return the configured client backoff policy
*/
ClientBackoffPolicy getBackoffPolicy();
/**
* @return the MetricsConnection instance associated with this connection.
*/
MetricsConnection getConnectionMetrics();
/**
* @return true when this connection uses a {@link org.apache.hadoop.hbase.codec.Codec} and so
* supports cell blocks.
*/
boolean hasCellBlockSupport();
/**
* @return the number of region servers that are currently running
* @throws IOException if a remote or network exception occurs
*/
int getCurrentNrHRS() throws IOException;
}

View File

@ -212,7 +212,7 @@ public class ConnectionFactory {
*/
public static Connection createConnection(Configuration conf, ExecutorService pool,
final User user) throws IOException {
String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
String className = conf.get(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionImplementation.class.getName());
Class<?> clazz;
try {

View File

@ -152,7 +152,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
justification="Access to the conncurrent hash map is under a lock so should be fine.")
@InterfaceAudience.Private
class ConnectionImplementation implements ClusterConnection, Closeable {
class ConnectionImplementation implements Connection, Closeable {
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
@ -354,9 +354,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
*/
@VisibleForTesting
static NonceGenerator injectNonceGeneratorForTesting(
ClusterConnection conn, NonceGenerator cnm) {
ConnectionImplementation connImpl = (ConnectionImplementation)conn;
NonceGenerator ng = connImpl.getNonceGenerator();
ConnectionImplementation conn, NonceGenerator cnm) {
NonceGenerator ng = conn.getNonceGenerator();
LOG.warn("Nonce generator is being replaced by test code for "
+ cnm.getClass().getName());
nonceGenerator = cnm;
@ -456,7 +455,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}), rpcControllerFactory);
}
@Override
/**
* @return the MetricsConnection instance associated with this connection.
*/
public MetricsConnection getConnectionMetrics() {
return this.metrics;
}
@ -600,7 +601,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* @deprecated this has been deprecated without a replacement
*/
@Deprecated
@Override
public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
// When getting the master connection, we check it's running,
// so if there is no exception, it means we've been able to get a
@ -628,18 +628,39 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
}
@Override
/**
* A table that isTableEnabled == false and isTableDisabled == false
* is possible. This happens when a table has a lot of regions
* that must be processed.
* @param tableName table name
* @return true if the table is enabled, false otherwise
* @throws IOException if a remote or network exception occurs
*/
public boolean isTableEnabled(TableName tableName) throws IOException {
return getTableState(tableName).inStates(TableState.State.ENABLED);
}
@Override
/**
* @param tableName table name
* @return true if the table is disabled, false otherwise
* @throws IOException if a remote or network exception occurs
*/
public boolean isTableDisabled(TableName tableName) throws IOException {
return getTableState(tableName).inStates(TableState.State.DISABLED);
}
@Override
/**
* Use this api to check if the table has been created with the specified number of
* splitkeys which was used while creating the given table.
* Note : If this api is used after a table's region gets splitted, the api may return
* false.
* @param tableName
* tableName
* @param splitKeys
* splitKeys used while creating table
* @throws IOException
* if a remote or network exception occurs
*/
public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
throws IOException {
checkClosed();
@ -809,15 +830,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
/**
*
* @param tableName table to get regions of
* @param row the row
* @param useCache Should we use the cache to retrieve the region information.
* @param retry do we retry
* @param replicaId the replicaId for the region
* @return region locations for this row.
* @throws IOException if IO failure occurs
*/
* @param tableName table to get regions of
* @param row the row
* @param useCache Should we use the cache to retrieve the region information.
* @param retry do we retry
* @param replicaId the replicaId for the region
* @return region locations for this row.
* @throws IOException if IO failure occurs
*/
RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
boolean retry, int replicaId) throws IOException {
checkClosed();
@ -1048,6 +1068,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
metaCache.clearCache(serverName);
}
/**
* Allows flushing the region cache.
*/
@Override
public void clearRegionLocationCache() {
metaCache.clearCache();
@ -1258,12 +1282,19 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
}
@Override
/**
* Get the admin service for master.
*/
public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
return getAdmin(get(registry.getMasterAddress()));
}
@Override
/**
* Establishes a connection to the region server at the specified address.
* @param serverName the region server to connect to
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*/
public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
throws IOException {
checkClosed();
@ -1279,7 +1310,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
});
}
@Override
/**
* Establishes a connection to the region server at the specified address, and returns a region
* client protocol.
* @param serverName the region server to connect to
* @return ClientProtocol proxy for RegionServer
* @throws IOException if a remote or network exception occurs
*/
public BlockingInterface getClient(ServerName serverName) throws IOException {
checkClosed();
if (isDeadServer(serverName)) {
@ -1289,14 +1326,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
serverName, this.hostnamesCanChange);
return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
return ClientProtos.ClientService.newBlockingStub(channel);
});
}
final MasterServiceState masterServiceState = new MasterServiceState(this);
@Override
public MasterKeepAliveConnection getMaster() throws IOException {
return getKeepAliveMasterService();
}
@ -1927,6 +1963,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
cacheLocation(hri.getTable(), source, newHrl);
}
/**
* Deletes cached locations for the specific region.
* @param location The location object for the region, to be purged from cache.
*/
void deleteCachedRegionLocation(final HRegionLocation location) {
metaCache.clearCache(location);
}
@ -2005,17 +2045,23 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
metaCache.clearCache(regionInfo);
}
@Override
/**
* @return Default AsyncProcess associated with this connection.
*/
public AsyncProcess getAsyncProcess() {
return asyncProcess;
}
@Override
/**
* @return the current statistics tracker associated with this connection
*/
public ServerStatisticTracker getStatisticsTracker() {
return this.stats;
}
@Override
/**
* @return the configured client backoff policy
*/
public ClientBackoffPolicy getBackoffPolicy() {
return this.backoffPolicy;
}
@ -2051,7 +2097,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return this.aborted;
}
@Override
/**
* @return the number of region servers that are currently running
* @throws IOException if a remote or network exception occurs
*/
public int getCurrentNrHRS() throws IOException {
return get(this.registry.getCurrentNrHRS());
}
@ -2094,12 +2143,18 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
close();
}
@Override
/**
* @return Nonce generator for this ClusterConnection; may be null if disabled in configuration.
*/
public NonceGenerator getNonceGenerator() {
return nonceGenerator;
}
@Override
/**
* Retrieve TableState, represent current table state.
* @param tableName table state for
* @return state of the table
*/
public TableState getTableState(TableName tableName) throws IOException {
checkClosed();
TableState tableState = MetaTableAccessor.getTableState(this, tableName);
@ -2109,28 +2164,43 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return tableState;
}
@Override
/**
* Returns a new RpcRetryingCallerFactory from the given {@link Configuration}.
* This RpcRetryingCallerFactory lets the users create {@link RpcRetryingCaller}s which can be
* intercepted with the configured {@link RetryingCallerInterceptor}
* @param conf configuration
* @return RpcRetryingCallerFactory
*/
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
return RpcRetryingCallerFactory
.instantiate(conf, this.interceptor, this.getStatisticsTracker());
}
@Override
/**
* @return true when this connection uses a {@link org.apache.hadoop.hbase.codec.Codec} and so
* supports cell blocks.
*/
public boolean hasCellBlockSupport() {
return this.rpcClient.hasCellBlockSupport();
}
@Override
/**
* @return a ConnectionConfiguration object holding parsed configuration values
*/
public ConnectionConfiguration getConnectionConfiguration() {
return this.connectionConfig;
}
@Override
/**
* @return Connection's RpcRetryingCallerFactory instance
*/
public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
return this.rpcCallerFactory;
}
@Override
/**
* @return Connection's RpcControllerFactory instance
*/
public RpcControllerFactory getRpcControllerFactory() {
return this.rpcControllerFactory;
}

View File

@ -80,6 +80,12 @@ public final class ConnectionUtils {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class);
/**
* Key for configuration in Configuration whose value is the class we implement making a new
* Connection instance.
*/
public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl";
private ConnectionUtils() {
}
@ -109,7 +115,7 @@ public final class ConnectionUtils {
* @param cnm Replaces the nonce generator used, for testing.
* @return old nonce generator.
*/
public static NonceGenerator injectNonceGeneratorForTesting(ClusterConnection conn,
public static NonceGenerator injectNonceGeneratorForTesting(ConnectionImplementation conn,
NonceGenerator cnm) {
return ConnectionImplementation.injectNonceGeneratorForTesting(conn, cnm);
}
@ -186,7 +192,7 @@ public final class ConnectionUtils {
* @return an short-circuit connection.
* @throws IOException if IO failure occurred
*/
public static ClusterConnection createShortCircuitConnection(final Configuration conf,
public static ConnectionImplementation createShortCircuitConnection(final Configuration conf,
ExecutorService pool, User user, final ServerName serverName,
final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
throws IOException {
@ -202,7 +208,7 @@ public final class ConnectionUtils {
*/
@VisibleForTesting
public static void setupMasterlessConnection(Configuration conf) {
conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, MasterlessConnection.class.getName());
conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, MasterlessConnection.class.getName());
}
/**

View File

@ -446,7 +446,7 @@ public class HBaseAdmin implements Admin {
/** @return Connection used by this object. */
@Override
public Connection getConnection() {
public ConnectionImplementation getConnection() {
return connection;
}
@ -485,23 +485,24 @@ public class HBaseAdmin implements Admin {
});
}
static TableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
int operationTimeout, int rpcTimeout) throws IOException {
static TableDescriptor getTableDescriptor(final TableName tableName,
ConnectionImplementation connection, RpcRetryingCallerFactory rpcCallerFactory,
final RpcControllerFactory rpcControllerFactory, int operationTimeout, int rpcTimeout)
throws IOException {
if (tableName == null) return null;
TableDescriptor td =
executeCallable(new MasterCallable<TableDescriptor>(connection, rpcControllerFactory) {
@Override
protected TableDescriptor rpcCall() throws Exception {
GetTableDescriptorsRequest req =
executeCallable(new MasterCallable<TableDescriptor>(connection, rpcControllerFactory) {
@Override
protected TableDescriptor rpcCall() throws Exception {
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(tableName);
GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
if (!htds.getTableSchemaList().isEmpty()) {
return ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0));
GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
if (!htds.getTableSchemaList().isEmpty()) {
return ProtobufUtil.toTableDescriptor(htds.getTableSchemaList().get(0));
}
return null;
}
return null;
}
}, rpcCallerFactory, operationTimeout, rpcTimeout);
}, rpcCallerFactory, operationTimeout, rpcTimeout);
if (td != null) {
return td;
}
@ -2027,8 +2028,8 @@ public class HBaseAdmin implements Admin {
// Check ZK first.
// If the connection exists, we may have a connection to ZK that does not work anymore
try (ClusterConnection connection =
(ClusterConnection) ConnectionFactory.createConnection(copyOfConf)) {
try (ConnectionImplementation connection =
(ConnectionImplementation) ConnectionFactory.createConnection(copyOfConf)) {
// can throw MasterNotRunningException
connection.isMasterRunning();
}

View File

@ -22,10 +22,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@ -33,16 +38,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.HbckService.BlockingInterface;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Use {@link ClusterConnection#getHbck()} to obtain an instance of {@link Hbck} instead of
* Use {@link Connection#getHbck()} to obtain an instance of {@link Hbck} instead of
* constructing an HBaseHbck directly.
*
* <p>Connection should be an <i>unmanaged</i> connection obtained via
@ -57,7 +55,6 @@ import org.slf4j.LoggerFactory;
* by each thread. Pooling or caching of the instance is not recommended.</p>
*
* @see ConnectionFactory
* @see ClusterConnection
* @see Hbck
*/
@InterfaceAudience.Private

View File

@ -153,7 +153,6 @@ public class HTable implements Table {
* @param rpcControllerFactory The RPC controller factory
* @param pool ExecutorService to be used.
*/
@InterfaceAudience.Private
protected HTable(final ConnectionImplementation connection,
final TableBuilderBase builder,
final RpcRetryingCallerFactory rpcCallerFactory,
@ -449,22 +448,18 @@ public class HTable implements Table {
}
public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
Callback<R> callback, Connection connection, ExecutorService pool, TableName tableName)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
ConnectionImplementation connImpl = (ConnectionImplementation) connection;
int operationTimeout = connImpl.getConnectionConfiguration().getOperationTimeout();
int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback)
.setPool(pool)
.setTableName(tableName)
.setRowAccess(actions)
.setResults(results)
.setOperationTimeout(operationTimeout)
.setRpcTimeout(writeTimeout)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.build();
AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
AsyncProcessTask<R> task =
AsyncProcessTask.newBuilder(callback).setPool(pool).setTableName(tableName)
.setRowAccess(actions).setResults(results).setOperationTimeout(operationTimeout)
.setRpcTimeout(writeTimeout).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
AsyncRequestFuture ars = connImpl.getAsyncProcess().submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();

View File

@ -21,7 +21,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
@ -31,7 +30,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
/**
* Hbck fixup tool APIs. Obtain an instance from {@link ClusterConnection#getHbck()} and call
* Hbck fixup tool APIs. Obtain an instance from {@link Connection#getHbck()} and call
* {@link #close()} when done.
* <p>WARNING: the below methods can damage the cluster. It may leave the cluster in an
* indeterminate state, e.g. region not assigned, or some hdfs files left behind. After running
@ -39,7 +38,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
* procedures to get regions back online. DO AT YOUR OWN RISK. For experienced users only.
*
* @see ConnectionFactory
* @see ClusterConnection
* @since 2.0.2, 2.1.1
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)

View File

@ -43,12 +43,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
*/
@InterfaceAudience.Private
abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
protected final ClusterConnection connection;
protected final ConnectionImplementation connection;
protected MasterKeepAliveConnection master;
private final HBaseRpcController rpcController;
MasterCallable(final Connection connection, final RpcControllerFactory rpcConnectionFactory) {
this.connection = (ClusterConnection) connection;
MasterCallable(ConnectionImplementation connection,
final RpcControllerFactory rpcConnectionFactory) {
this.connection = connection;
this.rpcController = rpcConnectionFactory.newController();
}

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -30,15 +29,16 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Callable that handles the <code>multi</code> method call going against a single
@ -51,7 +51,7 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
private MultiAction multiAction;
private boolean cellBlock;
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
MultiServerCallable(final ConnectionImplementation connection, final TableName tableName,
final ServerName location, final MultiAction multi, RpcController rpcController,
int rpcTimeout, RetryingTimeTracker tracker, int priority) {
super(connection, tableName, null, rpcController, rpcTimeout, tracker, priority);
@ -140,7 +140,7 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
private boolean isCellBlock() {
// This is not exact -- the configuration could have changed on us after connection was set up
// but it will do for now.
ClusterConnection conn = getConnection();
ConnectionImplementation conn = getConnection();
return conn.hasCellBlockSupport();
}

View File

@ -46,8 +46,8 @@ public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallabl
* @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>.
*/
public NoncedRegionServerCallable(Connection connection, TableName tableName, byte [] row,
HBaseRpcController rpcController, int priority) {
public NoncedRegionServerCallable(ConnectionImplementation connection, TableName tableName,
byte[] row, HBaseRpcController rpcController, int priority) {
super(connection, tableName, row, rpcController, priority);
this.nonce = getConnection().getNonceGenerator().newNonce();
}

View File

@ -46,7 +46,7 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorRpcChannel.class);
private final TableName table;
private final byte [] row;
private final ClusterConnection conn;
private final ConnectionImplementation conn;
private byte[] lastRegion;
private final int operationTimeout;
private final RpcRetryingCallerFactory rpcCallerFactory;
@ -57,7 +57,7 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
* @param table to connect to
* @param row to locate region with
*/
RegionCoprocessorRpcChannel(ClusterConnection conn, TableName table, byte[] row) {
RegionCoprocessorRpcChannel(ConnectionImplementation conn, TableName table, byte[] row) {
this.table = table;
this.row = row;
this.conn = conn;

View File

@ -19,18 +19,18 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
/**
* Implementations make a RPC call against a RegionService via a protobuf Service.
@ -74,12 +74,12 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
* @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>.
*/
public RegionServerCallable(ConnectionImplementation connection, TableName tableName, byte [] row,
public RegionServerCallable(ConnectionImplementation connection, TableName tableName, byte[] row,
RpcController rpcController) {
this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS);
}
public RegionServerCallable(ConnectionImplementation connection, TableName tableName, byte [] row,
public RegionServerCallable(ConnectionImplementation connection, TableName tableName, byte[] row,
RpcController rpcController, int priority) {
super();
this.connection = connection;
@ -159,7 +159,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
}
/**
* @return {@link ClusterConnection} instance used by this Callable.
* @return {@link ConnectionImplementation} instance used by this Callable.
*/
protected ConnectionImplementation getConnection() {
return this.connection;

View File

@ -37,13 +37,6 @@ public class ReversedClientScanner extends ClientScanner {
/**
* Create a new ReversibleClientScanner for the specified table Note that the passed
* {@link Scan}'s start row maybe changed.
* @param conf
* @param scan
* @param tableName
* @param connection
* @param pool
* @param primaryOperationTimeout
* @throws IOException
*/
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ConnectionImplementation connection, RpcRetryingCallerFactory rpcFactory,

View File

@ -52,8 +52,8 @@ public class ReversedScannerCallable extends ScannerCallable {
* @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the
* regionserver
*/
public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcFactory) {
public ReversedScannerCallable(ConnectionImplementation connection, TableName tableName,
Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcFactory) {
super(connection, tableName, scan, scanMetrics, rpcFactory);
}
@ -66,8 +66,8 @@ public class ReversedScannerCallable extends ScannerCallable {
* regionserver
* @param replicaId the replica id
*/
public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) {
public ReversedScannerCallable(ConnectionImplementation connection, TableName tableName,
Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) {
super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
}

View File

@ -100,23 +100,24 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
* @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
* metrics
* @param rpcControllerFactory factory to use when creating
* {@link com.google.protobuf.RpcController}
* {@link com.google.protobuf.RpcController}
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
public ScannerCallable(ConnectionImplementation connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
}
/**
*
* @param connection
* @param tableName
* @param scan
* @param scanMetrics
* @param id the replicaId
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
public ScannerCallable(ConnectionImplementation connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority());
super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(),
scan.getPriority());
this.id = id;
this.scan = scan;
this.scanMetrics = scanMetrics;

View File

@ -75,7 +75,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
public ScannerCallableWithReplicas(TableName tableName, ConnectionImplementation cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result []> caller) {
RpcRetryingCaller<Result[]> caller) {
this.currentScannerCallable = baseCallable;
this.cConnection = cConnection;
this.pool = pool;

View File

@ -1124,11 +1124,8 @@ public class TestAsyncProcess {
1, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
}
private void checkPeriodicFlushParameters(ClusterConnection conn,
MyAsyncProcess ap,
long setTO, long expectTO,
long setTT, long expectTT
) {
private void checkPeriodicFlushParameters(ConnectionImplementation conn, MyAsyncProcess ap,
long setTO, long expectTO, long setTT, long expectTT) {
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
// The BufferedMutatorParams does nothing with the value

View File

@ -44,8 +44,7 @@ public class TestBufferedMutator {
public TestName name = new TestName();
/**
* My BufferedMutator.
* Just to prove that I can insert a BM other than default.
* My BufferedMutator. Just to prove that I can insert a BM other than default.
*/
public static class MyBufferedMutator extends BufferedMutatorImpl {
MyBufferedMutator(ConnectionImplementation conn, RpcRetryingCallerFactory rpcCallerFactory,

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@ -28,7 +29,6 @@ import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterManager.ServiceType;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -37,10 +37,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
* Manages the interactions with an already deployed distributed cluster (as opposed to
* a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
@ -99,18 +95,6 @@ public class DistributedHBaseCluster extends HBaseCluster {
}
}
@Override
public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
throws IOException {
return ((ClusterConnection)this.connection).getAdmin(serverName);
}
@Override
public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
throws IOException {
return ((ClusterConnection)this.connection).getClient(serverName);
}
@Override
public void startRegionServer(String hostname, int port) throws IOException {
LOG.info("Starting RS on: " + hostname);
@ -262,13 +246,6 @@ public class DistributedHBaseCluster extends HBaseCluster {
throw new IOException("did timeout waiting for service to start:" + serverName);
}
@Override
public MasterService.BlockingInterface getMasterAdminService()
throws IOException {
return ((ClusterConnection)this.connection).getMaster();
}
@Override
public void startMaster(String hostname, int port) throws IOException {
LOG.info("Starting Master on: " + hostname + ":" + port);
@ -297,7 +274,7 @@ public class DistributedHBaseCluster extends HBaseCluster {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout) {
try {
getMasterAdminService();
connection.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.HBASE_VERSION));
return true;
} catch (MasterNotRunningException m) {
LOG.warn("Master not started yet " + m);

View File

@ -36,8 +36,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
@ -98,7 +98,7 @@ public class TestMultiTableInputFormatBase {
// canned responses.
JobContext mockedJobContext = Mockito.mock(JobContext.class);
Configuration c = HBaseConfiguration.create();
c.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, MRSplitsConnection.class.getName());
c.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, MRSplitsConnection.class.getName());
Mockito.when(mockedJobContext.getConfiguration()).thenReturn(c);
// Invent a bunch of scans. Have each Scan go against a different table so a good spread.
List<Scan> scans = new ArrayList<>();

View File

@ -17,9 +17,9 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -40,8 +40,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -90,7 +90,7 @@ public class TestTableInputFormatBase {
public void testNonSuccessiveSplitsAreNotMerged() throws IOException {
JobContext context = mock(JobContext.class);
Configuration conf = HBaseConfiguration.create();
conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionForMergeTesting.class.getName());
conf.set(TableInputFormat.INPUT_TABLE, "testTable");
conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true);

View File

@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -54,14 +53,6 @@ public interface Server extends Abortable, Stoppable {
Connection createConnection(Configuration conf) throws IOException;
/**
* Returns a reference to the servers' cluster connection. Prefer {@link #getConnection()}.
*
* Important note: this method returns a reference to Connection which is managed
* by Server itself, so callers must NOT attempt to close connection obtained.
*/
ClusterConnection getClusterConnection();
/**
* Returns a reference to the servers' async connection.
* <p/>

View File

@ -18,14 +18,13 @@
package org.apache.hadoop.hbase.backup.example;
import java.io.IOException;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
@ -36,9 +35,9 @@ public class ZKTableArchiveClient extends Configured {
/** Configuration key for the archive node. */
private static final String ZOOKEEPER_ZNODE_HFILE_ARCHIVE_KEY = "zookeeper.znode.hfile.archive";
private ClusterConnection connection;
private Connection connection;
public ZKTableArchiveClient(Configuration conf, ClusterConnection connection) {
public ZKTableArchiveClient(Configuration conf, Connection connection) {
super(conf);
this.connection = connection;
}

View File

@ -64,7 +64,7 @@ public interface AsyncClusterConnection extends AsyncConnection {
List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
/**
* Return all the replicas for a region. Used for regiong replica replication.
* Return all the replicas for a region. Used for region replica replication.
*/
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
boolean reload);

View File

@ -1170,7 +1170,7 @@ public class HMaster extends HRegionServer implements MasterServices {
if (QuotaUtil.isQuotaEnabled(conf)) {
// Create the quota snapshot notifier
spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
spaceQuotaSnapshotNotifier.initialize(getClusterConnection());
spaceQuotaSnapshotNotifier.initialize(getConnection());
this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
// Start the chore to read the region FS space reports and act on them
getChoreService().scheduleChore(quotaObserverChore);
@ -1267,7 +1267,7 @@ public class HMaster extends HRegionServer implements MasterServices {
*/
private boolean waitForNamespaceOnline() throws InterruptedException, IOException {
TableState nsTableState =
MetaTableAccessor.getTableState(getClusterConnection(), TableName.NAMESPACE_TABLE_NAME);
MetaTableAccessor.getTableState(getConnection(), TableName.NAMESPACE_TABLE_NAME);
if (nsTableState == null || nsTableState.isDisabled()) {
// this means we have already migrated the data and disabled or deleted the namespace table,
// or this is a new depliy which does not have a namespace table from the beginning.
@ -1858,7 +1858,7 @@ public class HMaster extends HRegionServer implements MasterServices {
List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
if (plans != null) {
for (NormalizationPlan plan : plans) {
plan.execute(clusterConnection.getAdmin());
plan.execute(connection.getAdmin());
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
@ -3059,8 +3059,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// this is what we want especially if the Master is in startup phase doing call outs to
// hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
// the rpc to timeout.
if (this.clusterConnection != null) {
this.clusterConnection.close();
if (this.connection != null) {
this.connection.close();
}
if (this.asyncClusterConnection != null) {
this.asyncClusterConnection.close();

View File

@ -28,19 +28,16 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
@ -66,22 +63,15 @@ final class AssignmentManagerUtil {
static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env,
final ServerName regionLocation, final RegionInfo hri, boolean includeBestSplitRow)
throws IOException {
// TODO: There is no timeout on this controller. Set one!
HBaseRpcController controller =
env.getMasterServices().getClusterConnection().getRpcControllerFactory().newController();
final AdminService.BlockingInterface admin =
env.getMasterServices().getClusterConnection().getAdmin(regionLocation);
AsyncRegionServerAdmin admin =
env.getMasterServices().getAsyncClusterConnection().getRegionServerAdmin(regionLocation);
GetRegionInfoRequest request = null;
if (includeBestSplitRow) {
request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName(), false, true);
} else {
request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName());
}
try {
return admin.getRegionInfo(controller, request);
} catch (ServiceException e) {
throw ProtobufUtil.handleRemoteException(e);
}
return FutureUtils.get(admin.getRegionInfo(request));
}
private static void lock(List<RegionStateNode> regionNodes) {

View File

@ -90,7 +90,7 @@ public class RegionServerSpaceQuotaManager {
return;
}
// Start the chores
this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getConnection());
rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
this.regionSizeReporter = new RegionSizeReportingChore(rsServices);
rsServices.getChoreService().scheduleChore(regionSizeReporter);

View File

@ -17,12 +17,11 @@
package org.apache.hadoop.hbase.quotas.policies;
import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A {@link SpaceViolationPolicyEnforcement} which disables the table. The enforcement counterpart

View File

@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
@ -277,14 +276,17 @@ public class HRegionServer extends HasThread implements
protected HeapMemoryManager hMemManager;
/**
* Cluster connection to be shared by services.
* Connection to be shared by services.
* <p/>
* Initialized at server startup and closed when server shuts down.
* <p/>
* Clients must never close it explicitly.
* Clients hosted by this Server should make use of this clusterConnection rather than create
* their own; if they create their own, there is no way for the hosting server to shutdown
* ongoing client RPCs.
* <p/>
* Clients hosted by this Server should make use of this connection rather than create their own;
* if they create their own, there is no way for the hosting server to shutdown ongoing client
* RPCs.
*/
protected ClusterConnection clusterConnection;
protected Connection connection;
/**
* The asynchronous cluster connection to be shared by services.
@ -829,11 +831,11 @@ public class HRegionServer extends HasThread implements
* Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
* local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
*/
private ClusterConnection createClusterConnection() throws IOException {
private Connection createConnection() throws IOException {
// Create a cluster connection that when appropriate, can short-circuit and go directly to the
// local server if the request is to the local server bypassing RPC. Can be used for both local
// and remote invocations.
ClusterConnection conn =
Connection conn =
ConnectionUtils.createShortCircuitConnection(unsetClientZookeeperQuorum(), null,
userProvider.getCurrent(), serverName, rpcServices, rpcServices);
// This is used to initialize the batch thread pool inside the connection implementation.
@ -870,8 +872,8 @@ public class HRegionServer extends HasThread implements
* Setup our cluster connection if not already initialized.
*/
protected final synchronized void setupClusterConnection() throws IOException {
if (clusterConnection == null) {
clusterConnection = createClusterConnection();
if (connection == null) {
connection = createConnection();
asyncClusterConnection =
ClusterConnectionFactory.createAsyncClusterConnection(unsetClientZookeeperQuorum(),
new InetSocketAddress(this.rpcServices.isa.getAddress(), 0), userProvider.getCurrent());
@ -1128,9 +1130,9 @@ public class HRegionServer extends HasThread implements
LOG.info("stopping server " + this.serverName);
}
if (this.clusterConnection != null && !clusterConnection.isClosed()) {
if (this.connection != null && !connection.isClosed()) {
try {
this.clusterConnection.close();
this.connection.close();
} catch (IOException e) {
// Although the {@link Closeable} interface throws an {@link
// IOException}, in reality, the implementation would never do that.
@ -2201,12 +2203,7 @@ public class HRegionServer extends HasThread implements
@Override
public Connection getConnection() {
return getClusterConnection();
}
@Override
public ClusterConnection getClusterConnection() {
return this.clusterConnection;
return this.connection;
}
@Override
@ -2312,7 +2309,7 @@ public class HRegionServer extends HasThread implements
}
} else {
try {
MetaTableAccessor.updateRegionLocation(clusterConnection,
MetaTableAccessor.updateRegionLocation(connection,
hris[0], serverName, openSeqNum, masterSystemTime);
} catch (IOException e) {
LOG.info("Failed to update meta", e);
@ -2343,7 +2340,7 @@ public class HRegionServer extends HasThread implements
// Keep looping till we get an error. We want to send reports even though server is going down.
// Only go down if clusterConnection is null. It is set to null almost as last thing as the
// HRegionServer does down.
while (this.clusterConnection != null && !this.clusterConnection.isClosed()) {
while (this.connection != null && !this.connection.isClosed()) {
RegionServerStatusService.BlockingInterface rss = rssStub;
try {
if (rss == null) {
@ -3816,7 +3813,7 @@ public class HRegionServer extends HasThread implements
@Override
public void unassign(byte[] regionName) throws IOException {
clusterConnection.getAdmin().unassign(regionName, false);
connection.getAdmin().unassign(regionName, false);
}
@Override

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.replication.TableCFs;
@ -208,7 +208,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
Configuration conf = getConf();
HBaseAdmin.available(conf);
ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
ZKWatcher zkw = new ZKWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -153,7 +152,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -162,11 +161,6 @@ public class ReplicationSyncUp extends Configured implements Tool {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -89,7 +89,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
@ -161,9 +160,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.collect.TreeMultimap;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
/**
* HBaseFsck (hbck) is a tool for checking and repairing region consistency and
* table integrity problems in a corrupted HBase. This tool was written for hbase-1.x. It does not
@ -245,7 +241,7 @@ public class HBaseFsck extends Configured implements Closeable {
**********************/
private static final Logger LOG = LoggerFactory.getLogger(HBaseFsck.class.getName());
private ClusterMetrics status;
private ClusterConnection connection;
private Connection connection;
private Admin admin;
private Table meta;
// threads to do ||izable tasks: retrieve data from regionservers, handle overlapping regions
@ -585,7 +581,7 @@ public class HBaseFsck extends Configured implements Closeable {
LOG.info("Launching hbck");
connection = (ClusterConnection)ConnectionFactory.createConnection(getConf());
connection = ConnectionFactory.createConnection(getConf());
admin = connection.getAdmin();
meta = connection.getTable(TableName.META_TABLE_NAME);
status = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS,
@ -4332,10 +4328,10 @@ public class HBaseFsck extends Configured implements Closeable {
private final HBaseFsck hbck;
private final ServerName rsinfo;
private final ErrorReporter errors;
private final ClusterConnection connection;
private final Connection connection;
WorkItemRegion(HBaseFsck hbck, ServerName info,
ErrorReporter errors, ClusterConnection connection) {
ErrorReporter errors, Connection connection) {
this.hbck = hbck;
this.rsinfo = info;
this.errors = errors;
@ -4346,32 +4342,29 @@ public class HBaseFsck extends Configured implements Closeable {
public synchronized Void call() throws IOException {
errors.progress();
try {
BlockingInterface server = connection.getAdmin(rsinfo);
// list all online regions from this region server
List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(server);
List<RegionInfo> regions = connection.getAdmin().getRegions(rsinfo);
regions = filterRegions(regions);
if (details) {
errors.detail("RegionServer: " + rsinfo.getServerName() +
" number of regions: " + regions.size());
for (RegionInfo rinfo: regions) {
errors.detail(" " + rinfo.getRegionNameAsString() +
" id: " + rinfo.getRegionId() +
" encoded_name: " + rinfo.getEncodedName() +
" start: " + Bytes.toStringBinary(rinfo.getStartKey()) +
" end: " + Bytes.toStringBinary(rinfo.getEndKey()));
errors.detail(
"RegionServer: " + rsinfo.getServerName() + " number of regions: " + regions.size());
for (RegionInfo rinfo : regions) {
errors.detail(" " + rinfo.getRegionNameAsString() + " id: " + rinfo.getRegionId() +
" encoded_name: " + rinfo.getEncodedName() + " start: " +
Bytes.toStringBinary(rinfo.getStartKey()) + " end: " +
Bytes.toStringBinary(rinfo.getEndKey()));
}
}
// check to see if the existence of this region matches the region in META
for (RegionInfo r:regions) {
for (RegionInfo r : regions) {
HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName());
hbi.addServer(r, rsinfo);
}
} catch (IOException e) { // unable to connect to the region server.
errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "RegionServer: " + rsinfo.getServerName() +
" Unable to fetch region information. " + e);
} catch (IOException e) { // unable to connect to the region server.
errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE,
"RegionServer: " + rsinfo.getServerName() + " Unable to fetch region information. " + e);
throw e;
}
return null;

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

View File

@ -26,19 +26,17 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides ability to create multiple Connection instances and allows to process a batch of
@ -112,14 +110,11 @@ public class MultiHConnection {
* @param callback to run when results are in
* @throws IOException If IO failure occurs
*/
@SuppressWarnings("deprecation")
public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName,
Object[] results, Batch.Callback<R> callback) throws IOException {
// Currently used by RegionStateStore
ClusterConnection conn =
(ClusterConnection) connections[ThreadLocalRandom.current().nextInt(noOfConnections)];
HTable.doBatchWithCallback(actions, results, callback, conn, batchPool, tableName);
HTable.doBatchWithCallback(actions, results, callback,
connections[ThreadLocalRandom.current().nextInt(noOfConnections)], batchPool, tableName);
}
// Copied from ConnectionImplementation.getBatchPool()

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
@ -713,7 +712,6 @@ public class RegionSplitter {
htd = table.getDescriptor();
}
try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
// for every region that hasn't been verified as a finished split
for (Pair<byte[], byte[]> region : regionList) {
byte[] start = region.getFirst();
@ -721,7 +719,7 @@ public class RegionSplitter {
// see if the new split daughter region has come online
try {
RegionInfo dri = regionLocator.getRegionLocation(split).getRegion();
RegionInfo dri = regionLocator.getRegionLocation(split, true).getRegion();
if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
logicalSplitting.add(region);
continue;
@ -739,7 +737,7 @@ public class RegionSplitter {
LinkedList<RegionInfo> check = Lists.newLinkedList();
check.add(regionLocator.getRegionLocation(start).getRegion());
check.add(regionLocator.getRegionLocation(split).getRegion());
for (HRegionInfo hri : check.toArray(new HRegionInfo[check.size()])) {
for (RegionInfo hri : check.toArray(new RegionInfo[check.size()])) {
byte[] sk = hri.getStartKey();
if (sk.length == 0)
sk = splitAlgo.firstRow();

View File

@ -262,7 +262,7 @@ if ( fqtn != null ) {
stateMap.put(regionInfo.getEncodedName(), regionState);
}
}
RegionLocator r = master.getClusterConnection().getRegionLocator(table.getName());
RegionLocator r = master.getConnection().getRegionLocator(table.getName());
try { %>
<h2>Table Attributes</h2>
<table class="table table-striped">

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@ -28,10 +27,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
* This class defines methods that can help with managing HBase clusters
* from unit tests and system tests. There are 3 types of cluster deployments:
@ -97,24 +92,6 @@ public abstract class HBaseCluster implements Closeable, Configurable {
return initialClusterStatus;
}
/**
* Returns an {@link MasterService.BlockingInterface} to the active master
*/
public abstract MasterService.BlockingInterface getMasterAdminService()
throws IOException;
/**
* Returns an AdminProtocol interface to the regionserver
*/
public abstract AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
throws IOException;
/**
* Returns a ClientProtocol interface to the regionserver
*/
public abstract ClientService.BlockingInterface getClientProtocol(ServerName serverName)
throws IOException;
/**
* Starts a new region server on the given hostname or if this is a mini/local cluster,
* starts a region server locally.

View File

@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.master.HMaster;
@ -42,9 +41,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
/**
@ -517,15 +513,6 @@ public class MiniHBaseCluster extends HBaseCluster {
return t;
}
/**
* Returns the current active master, if available.
* @return the active HMaster, null if none is active.
*/
@Override
public MasterService.BlockingInterface getMasterAdminService() {
return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
}
/**
* Returns the current active master, if available.
* @return the active HMaster, null if none is active.
@ -921,15 +908,4 @@ public class MiniHBaseCluster extends HBaseCluster {
}
return -1;
}
@Override
public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
}
@Override
public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
throws IOException {
return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
}
}

View File

@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.locking.EntityLock;
@ -162,7 +161,7 @@ public class MockRegionServerServices implements RegionServerServices {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -266,7 +265,6 @@ public class MockRegionServerServices implements RegionServerServices {
@Override
public ServerNonceManager getNonceManager() {
// TODO Auto-generated method stub
return null;
}
@ -277,7 +275,6 @@ public class MockRegionServerServices implements RegionServerServices {
@Override
public boolean registerService(Service service) {
// TODO Auto-generated method stub
return false;
}
@ -291,11 +288,6 @@ public class MockRegionServerServices implements RegionServerServices {
return 0;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
@Override
public ThroughputController getFlushThroughputController() {
return null;

View File

@ -35,9 +35,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
@ -86,7 +86,7 @@ public class TestZooKeeperTableArchiveClient {
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
private static ZKTableArchiveClient archivingClient;
private final List<Path> toCleanup = new ArrayList<>();
private static ClusterConnection CONNECTION;
private static Connection CONNECTION;
private static RegionServerServices rss;
/**
@ -96,7 +96,7 @@ public class TestZooKeeperTableArchiveClient {
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniZKCluster();
CONNECTION = (ClusterConnection)ConnectionFactory.createConnection(UTIL.getConfiguration());
CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
// make hfile archiving node so we can archive files
ZKWatcher watcher = UTIL.getZooKeeperWatcher();

View File

@ -57,11 +57,11 @@ public class HConnectionTestingUtility {
* @throws ZooKeeperConnectionException
*/
public static ConnectionImplementation getMockedConnection(final Configuration conf)
throws ZooKeeperConnectionException {
throws ZooKeeperConnectionException {
ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
Mockito.when(connection.getConfiguration()).thenReturn(conf);
Mockito.when(connection.getRpcControllerFactory()).thenReturn(
Mockito.mock(RpcControllerFactory.class));
Mockito.when(connection.getRpcControllerFactory())
.thenReturn(Mockito.mock(RpcControllerFactory.class));
// we need a real retrying caller
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
@ -81,11 +81,10 @@ public class HConnectionTestingUtility {
* the mocked connection
* @return Mock up a connection that returns a {@link Configuration} when
* {@link ConnectionImplementation#getConfiguration()} is called, a 'location' when
* {@link ConnectionImplementation#getRegionLocation(TableName,byte[], boolean)}
* is called, and that returns the passed
* {@link AdminProtos.AdminService.BlockingInterface} instance when
* {@link ConnectionImplementation#getAdmin(ServerName)} is called, returns the passed
* {@link ClientProtos.ClientService.BlockingInterface} instance when
* {@link ConnectionImplementation#getRegionLocation(TableName, byte[], boolean)} is
* called, and that returns the passed {@link AdminProtos.AdminService.BlockingInterface}
* instance when {@link ConnectionImplementation#getAdmin(ServerName)} is called, returns
* the passed {@link ClientProtos.ClientService.BlockingInterface} instance when
* {@link ConnectionImplementation#getClient(ServerName)} is called (Be sure to call
* {@link Connection#close()} when done with this mocked Connection.
*/
@ -138,9 +137,7 @@ public class HConnectionTestingUtility {
* calling {@link Connection#close()} else it will stick around; this is probably not what you
* want.
* @param conf configuration
* @return ConnectionImplementation object for <code>conf</code>
* @throws ZooKeeperConnectionException [Dead link]: See also
* {http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html#spy(T)}
* @return ClusterConnection object for <code>conf</code>
*/
public static ConnectionImplementation getSpiedConnection(final Configuration conf)
throws IOException {

View File

@ -175,7 +175,7 @@ public class TestAdmin extends TestAdminBase {
List<HRegionLocation> regions;
Iterator<HRegionLocation> hris;
RegionInfo hri;
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(table)) {
regions = l.getAllRegionLocations();
@ -343,7 +343,7 @@ public class TestAdmin extends TestAdminBase {
}
}
private void verifyRoundRobinDistribution(ClusterConnection c, RegionLocator regionLocator,
private void verifyRoundRobinDistribution(ConnectionImplementation c, RegionLocator regionLocator,
int expectedRegions) throws IOException {
int numRS = c.getCurrentNrHRS();
List<HRegionLocation> regions = regionLocator.getAllRegionLocations();

View File

@ -435,8 +435,8 @@ public class TestAdmin1 extends TestAdminBase {
nameofRegionsToMerge[1] = regions.get(2).getFirst().getEncodedNameAsBytes();
MergeTableRegionsRequest request = RequestConverter.buildMergeTableRegionsRequest(
nameofRegionsToMerge, true, HConstants.NO_NONCE, HConstants.NO_NONCE);
((ClusterConnection) TEST_UTIL.getAdmin().getConnection()).getMaster().mergeTableRegions(null,
request);
((ConnectionImplementation) TEST_UTIL.getAdmin().getConnection()).getMaster()
.mergeTableRegions(null, request);
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException m) {
Throwable t = m.getCause();
do {

View File

@ -733,7 +733,7 @@ public class TestAdmin2 extends TestAdminBase {
Assert.assertNotNull(store);
Assert.assertEquals(expectedStoreFilesSize, store.getSize());
ClusterConnection conn = ((ClusterConnection) ADMIN.getConnection());
ConnectionImplementation conn = (ConnectionImplementation) ADMIN.getConnection();
HBaseRpcController controller = conn.getRpcControllerFactory().newController();
for (int i = 0; i < 10; i++) {
RegionInfo ri =

View File

@ -274,7 +274,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
private void verifyRoundRobinDistribution(List<HRegionLocation> regions, int expectedRegions)
throws IOException {
int numRS = ((ClusterConnection) TEST_UTIL.getConnection()).getCurrentNrHRS();
int numRS = ((ConnectionImplementation) TEST_UTIL.getConnection()).getCurrentNrHRS();
Map<ServerName, List<RegionInfo>> server2Regions = new HashMap<>();
regions.stream().forEach((loc) -> {

View File

@ -94,9 +94,9 @@ public class TestCISleep extends AbstractTestCITimeout {
final TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(tableName, FAM_NAM);
ClientServiceCallable<Object> regionServerCallable =
new ClientServiceCallable<Object>(TEST_UTIL.getConnection(), tableName, FAM_NAM,
new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController(),
HConstants.PRIORITY_UNSET) {
new ClientServiceCallable<Object>((ConnectionImplementation) TEST_UTIL.getConnection(),
tableName, FAM_NAM, new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController(),
HConstants.PRIORITY_UNSET) {
@Override
protected Object rpcCall() throws Exception {
return null;
@ -126,9 +126,9 @@ public class TestCISleep extends AbstractTestCITimeout {
assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
}
try (
MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(),
new RpcControllerFactory(TEST_UTIL.getConfiguration())) {
try (MasterCallable<Object> masterCallable =
new MasterCallable<Object>((ConnectionImplementation) TEST_UTIL.getConnection(),
new RpcControllerFactory(TEST_UTIL.getConfiguration())) {
@Override
protected Object rpcCall() throws Exception {
return null;

View File

@ -819,7 +819,7 @@ public class TestConnectionImplementation {
* from ZK by the client.
*/
@Test
public void testConnection() throws Exception{
public void testConnection() throws Exception {
// We create an empty config and add the ZK address.
Configuration c = new Configuration();
c.set(HConstants.ZOOKEEPER_QUORUM,
@ -828,7 +828,8 @@ public class TestConnectionImplementation {
TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
// This should be enough to connect
ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
ConnectionImplementation conn =
(ConnectionImplementation) ConnectionFactory.createConnection(c);
assertTrue(conn.isMasterRunning());
conn.close();
}

View File

@ -154,7 +154,7 @@ public class TestFromClientSide3 {
// connection needed for poll-wait
HRegionLocation loc = locator.getRegionLocation(row, true);
AdminProtos.AdminService.BlockingInterface server =
((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName());
((ConnectionImplementation) admin.getConnection()).getAdmin(loc.getServerName());
byte[] regName = loc.getRegion().getRegionName();
for (int i = 0; i < nFlushes; i++) {
@ -276,7 +276,7 @@ public class TestFromClientSide3 {
try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
try (Admin admin = TEST_UTIL.getAdmin()) {
ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection();
ConnectionImplementation connection = (ConnectionImplementation) TEST_UTIL.getConnection();
// Create 3 store files.
byte[] row = Bytes.toBytes(random.nextInt());
@ -655,7 +655,7 @@ public class TestFromClientSide3 {
@Test
public void testConnectionDefaultUsesCodec() throws Exception {
ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
ConnectionImplementation con = (ConnectionImplementation) TEST_UTIL.getConnection();
assertTrue(con.hasCellBlockSupport());
}

View File

@ -107,15 +107,13 @@ public class TestMetaTableAccessorNoCluster {
Result r = Result.create(kvs);
assertNull(MetaTableAccessor.getRegionInfo(r));
byte [] f = HConstants.CATALOG_FAMILY;
byte[] f = HConstants.CATALOG_FAMILY;
// Make a key value that doesn't have the expected qualifier.
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f,
HConstants.SERVER_QUALIFIER, f));
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f, HConstants.SERVER_QUALIFIER, f));
r = Result.create(kvs);
assertNull(MetaTableAccessor.getRegionInfo(r));
// Make a key that does not have a regioninfo value.
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f,
HConstants.REGIONINFO_QUALIFIER, f));
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, f, HConstants.REGIONINFO_QUALIFIER, f));
RegionInfo hri = MetaTableAccessor.getRegionInfo(Result.create(kvs));
assertTrue(hri == null);
// OK, give it what it expects
@ -161,7 +159,7 @@ public class TestMetaTableAccessorNoCluster {
RegionInfo.toByteArray(RegionInfoBuilder.FIRST_META_REGIONINFO)));
kvs.add(new KeyValue(rowToVerify,
HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
Bytes.toBytes(sn.getHostAndPort())));
Bytes.toBytes(sn.getAddress().toString())));
kvs.add(new KeyValue(rowToVerify,
HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
Bytes.toBytes(sn.getStartcode())));

View File

@ -553,7 +553,7 @@ public class TestMultiParallel {
};
NonceGenerator oldCnm =
ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm);
ConnectionUtils.injectNonceGeneratorForTesting((ConnectionImplementation) connection, cnm);
// First test sequential requests.
try {
@ -615,7 +615,8 @@ public class TestMultiParallel {
validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
table.close();
} finally {
ConnectionImplementation.injectNonceGeneratorForTesting((ClusterConnection) connection, oldCnm);
ConnectionImplementation.injectNonceGeneratorForTesting((ConnectionImplementation) connection,
oldCnm);
}
}

View File

@ -571,7 +571,7 @@ public class TestReplicasClient {
LOG.info("get works and is not stale done");
//reset
ClusterConnection connection = (ClusterConnection) HTU.getConnection();
ConnectionImplementation connection = (ConnectionImplementation) HTU.getConnection();
Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
hedgedReadOps.dec(hedgedReadOps.getCount());
@ -638,7 +638,7 @@ public class TestReplicasClient {
Thread.sleep(1000 + REFRESH_PERIOD * 2);
AsyncProcess ap = ((ClusterConnection) HTU.getConnection()).getAsyncProcess();
AsyncProcess ap = ((ConnectionImplementation) HTU.getConnection()).getAsyncProcess();
// Make primary slowdown
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
@ -654,16 +654,14 @@ public class TestReplicasClient {
gets.add(g);
Object[] results = new Object[2];
int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout();
int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout();
AsyncProcessTask task = AsyncProcessTask.newBuilder()
.setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
.setTableName(table.getName())
.setRowAccess(gets)
.setResults(results)
.setOperationTimeout(operationTimeout)
.setRpcTimeout(readTimeout)
.build();
int operationTimeout = ((ConnectionImplementation) HTU.getConnection())
.getConnectionConfiguration().getOperationTimeout();
int readTimeout = ((ConnectionImplementation) HTU.getConnection())
.getConnectionConfiguration().getReadRpcTimeout();
AsyncProcessTask task =
AsyncProcessTask.newBuilder().setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
.setTableName(table.getName()).setRowAccess(gets).setResults(results)
.setOperationTimeout(operationTimeout).setRpcTimeout(readTimeout).build();
AsyncRequestFuture reqs = ap.submit(task);
reqs.waitUntilDone();
// verify we got the right results back

View File

@ -206,7 +206,7 @@ public class TestSeparateClientZKCluster {
// create table
Connection conn = TEST_UTIL.getConnection();
Admin admin = conn.getAdmin();
HTable table = (HTable) conn.getTable(tn);
Table table = conn.getTable(tn);
try {
ColumnFamilyDescriptorBuilder cfDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(family);

View File

@ -75,7 +75,7 @@ public class TestShortCircuitConnection {
htd.addFamily(hcd);
UTIL.createTable(htd, null);
HRegionServer regionServer = UTIL.getRSForFirstRegionInTable(tableName);
ClusterConnection connection = regionServer.getClusterConnection();
ConnectionImplementation connection = (ConnectionImplementation) regionServer.getConnection();
Table tableIf = connection.getTable(tableName);
assertTrue(tableIf instanceof HTable);
HTable table = (HTable) tableIf;

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.MasterSwitchType;
@ -163,7 +162,7 @@ public class MockNoopMasterServices implements MasterServices {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -354,11 +353,6 @@ public class MockNoopMasterServices implements MasterServices {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
@Override
public LoadBalancer getLoadBalancer() {
return null;

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@ -305,7 +304,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -620,11 +619,6 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
return 0;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
@Override
public ThroughputController getFlushThroughputController() {
return null;

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -313,7 +312,7 @@ public class TestActiveMasterManager {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -330,12 +329,6 @@ public class TestActiveMasterManager {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -18,16 +18,12 @@
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.net.InetAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
@ -51,14 +47,7 @@ public class TestClockSkewDetection {
@Test
public void testClockSkewDetection() throws Exception {
final Configuration conf = HBaseConfiguration.create();
ServerManager sm = new ServerManager(new MockNoopMasterServices(conf) {
@Override
public ClusterConnection getClusterConnection() {
ClusterConnection conn = mock(ClusterConnection.class);
when(conn.getRpcControllerFactory()).thenReturn(mock(RpcControllerFactory.class));
return conn;
}
});
ServerManager sm = new ServerManager(new MockNoopMasterServices(conf));
LOG.debug("regionServerStartup 1");
InetAddress ia1 = InetAddress.getLocalHost();

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.replication.ReplicationException;
@ -184,7 +184,7 @@ public class TestMasterNoCluster {
// Insert a mock for the connection, use TESTUTIL.getConfiguration rather than
// the conf from the master; the conf will already have an ClusterConnection
// associate so the below mocking of a connection will fail.
final ClusterConnection mockedConnection = HConnectionTestingUtility.getMockedConnectionAndDecorate(
final Connection mockedConnection = HConnectionTestingUtility.getMockedConnectionAndDecorate(
TESTUTIL.getConfiguration(), rs0, rs0, rs0.getServerName(),
HRegionInfo.FIRST_META_REGIONINFO);
HMaster master = new HMaster(conf) {
@ -212,12 +212,7 @@ public class TestMasterNoCluster {
}
@Override
public ClusterConnection getConnection() {
return mockedConnection;
}
@Override
public ClusterConnection getClusterConnection() {
public Connection getConnection() {
return mockedConnection;
}
};
@ -281,7 +276,7 @@ public class TestMasterNoCluster {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
// Insert a mock for the connection, use TESTUTIL.getConfiguration rather than
// the conf from the master; the conf will already have a Connection
// associate so the below mocking of a connection will fail.

View File

@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
@ -87,7 +87,7 @@ public class MockMasterServices extends MockNoopMasterServices {
private MasterProcedureEnv procedureEnv;
private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
private ProcedureStore procedureStore;
private final ClusterConnection connection;
private final Connection connection;
private final LoadBalancer balancer;
private final ServerManager serverManager;
@ -284,7 +284,7 @@ public class MockMasterServices extends MockNoopMasterServices {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return this.connection;
}

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -228,7 +227,7 @@ public class TestHFileCleaner {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -260,12 +259,6 @@ public class TestHFileCleaner {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -164,7 +163,7 @@ public class TestHFileLinkCleaner {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -193,13 +192,6 @@ public class TestHFileLinkCleaner {
public ChoreService getChoreService() {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.replication.ReplicationException;
@ -363,7 +362,7 @@ public class TestLogsCleaner {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -393,11 +392,6 @@ public class TestLogsCleaner {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.replication.ReplicationException;
@ -252,7 +251,7 @@ public class TestReplicationHFileCleaner {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -284,12 +283,6 @@ public class TestReplicationHFileCleaner {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -362,11 +362,11 @@ public class MasterProcedureTestingUtility {
// Procedure Helpers
// ==========================================================================
public static long generateNonceGroup(final HMaster master) {
return master.getClusterConnection().getNonceGenerator().getNonceGroup();
return master.getAsyncClusterConnection().getNonceGenerator().getNonceGroup();
}
public static long generateNonce(final HMaster master) {
return master.getClusterConnection().getNonceGenerator().newNonce();
return master.getAsyncClusterConnection().getNonceGenerator().newNonce();
}
/**

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
@ -48,12 +48,10 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@ -66,7 +64,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
@ -89,10 +86,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
/**
* Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
* the region server's bullkLoad functionality.
@ -214,29 +207,15 @@ public class TestHRegionServerBulkLoad {
}
// bulk load HFiles
BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
final Connection conn = UTIL.getConnection();
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 5 == 0) {
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
// 5 * 50 = 250 open file handles!
ClientServiceCallable<Void> callable =
new ClientServiceCallable<Void>(UTIL.getConnection(), tableName, Bytes.toBytes("aaa"),
new RpcControllerFactory(UTIL.getConfiguration()).newController(),
HConstants.PRIORITY_UNSET) {
@Override
protected Void rpcCall() throws Exception {
LOG.debug(
"compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow()));
AdminProtos.AdminService.BlockingInterface server =
((ClusterConnection) UTIL.getConnection()).getAdmin(getLocation().getServerName());
CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(
getLocation().getRegion().getRegionName(), true, null);
server.compactRegion(null, request);
numCompactions.incrementAndGet();
return null;
}
};
caller.callWithRetries(callable, Integer.MAX_VALUE);
try (RegionLocator locator = conn.getRegionLocator(tableName)) {
HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true);
conn.getAdmin().compactRegion(loc.getRegion().getRegionName());
numCompactions.incrementAndGet();
}
}
}
}

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
@ -829,7 +828,7 @@ public class TestHeapMemoryManager {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -843,12 +842,6 @@ public class TestHeapMemoryManager {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService;
@ -132,7 +131,7 @@ public class TestSplitLogWorker {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -141,12 +140,6 @@ public class TestSplitLogWorker {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
@ -470,7 +469,7 @@ public class TestWALLockup {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -505,11 +504,6 @@ public class TestWALLockup {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@ -209,7 +208,7 @@ public class TestReplicationTrackerZKImpl {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -244,12 +243,6 @@ public class TestReplicationTrackerZKImpl {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -852,8 +851,9 @@ public abstract class TestReplicationSourceManager {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -887,12 +887,6 @@ public abstract class TestReplicationSourceManager {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
@ -211,7 +210,7 @@ public class TestTokenAuthentication {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -354,12 +353,6 @@ public class TestTokenAuthentication {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public Connection createConnection(Configuration conf) throws IOException {
return null;

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
@ -74,9 +73,6 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
/**
* This is the base class for HBaseFsck's ability to detect reasons for inconsistent tables.
*
@ -98,7 +94,7 @@ public class BaseTestHBaseFsck {
protected static RegionStates regionStates;
protected static ExecutorService tableExecutorService;
protected static ScheduledThreadPoolExecutor hbfsckExecutorService;
protected static ClusterConnection connection;
protected static Connection connection;
protected static Admin admin;
// for the instance, reset every test run
@ -298,9 +294,6 @@ public class BaseTestHBaseFsck {
/**
* delete table in preparation for next test
*
* @param tablename
* @throws IOException
*/
void cleanupTable(TableName tablename) throws Exception {
if (tbl != null) {
@ -319,10 +312,8 @@ public class BaseTestHBaseFsck {
Collection<ServerName> regionServers = status.getLiveServerMetrics().keySet();
Map<ServerName, List<String>> mm = new HashMap<>();
for (ServerName hsi : regionServers) {
AdminProtos.AdminService.BlockingInterface server = connection.getAdmin(hsi);
// list all online regions from this region server
List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(server);
List<RegionInfo> regions = admin.getRegions(hsi);
List<String> regionNames = new ArrayList<>(regions.size());
for (RegionInfo hri : regions) {
regionNames.add(hri.getRegionNameAsString());

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -104,7 +103,7 @@ public class MockServer implements Server {
}
@Override
public ClusterConnection getConnection() {
public Connection getConnection() {
return null;
}
@ -115,7 +114,6 @@ public class MockServer implements Server {
@Override
public boolean isAborted() {
// TODO Auto-generated method stub
return this.aborted;
}
@ -124,12 +122,6 @@ public class MockServer implements Server {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
@ -56,7 +56,7 @@ public abstract class MultiThreadedAction {
protected final TableName tableName;
protected final Configuration conf;
protected final ClusterConnection connection; // all reader / writer threads will share this connection
protected final Connection connection; // all reader / writer threads will share this connection
protected int numThreads = 1;
@ -151,7 +151,7 @@ public abstract class MultiThreadedAction {
this.dataGenerator = dataGen;
this.tableName = tableName;
this.actionLetter = actionLetter;
this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
this.connection = ConnectionFactory.createConnection(conf);
}
public void start(long startKey, long endKey, int numThreads) throws IOException {

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.io.hfile.TestHFile;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@ -74,7 +73,7 @@ public class TestHBaseFsckMOB extends BaseTestHBaseFsck {
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
regionStates = assignmentManager.getRegionStates();
connection = (ClusterConnection) TEST_UTIL.getConnection();
connection = TEST_UTIL.getConnection();
admin = connection.getAdmin();
admin.balancerSwitch(false, true);

View File

@ -29,9 +29,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import javax.net.ssl.SSLException;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;

View File

@ -40,11 +40,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@ -143,7 +143,7 @@ public class TestThriftConnection {
private static Connection createConnection(int port, boolean useHttp) throws IOException {
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
ThriftConnection.class.getName());
if (useHttp) {
conf.set(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS,