HBASE-10479 HConnection interface is public but is used internally, and contains a bunch of methods

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1566501 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2014-02-10 04:16:34 +00:00
parent df5bd6e99f
commit 74ab28497d
27 changed files with 3480 additions and 2834 deletions

View File

@ -122,7 +122,7 @@ class AsyncProcess {
// TODO: many of the fields should be made private
protected final long id;
protected final HConnection hConnection;
protected final ClusterConnection hConnection;
protected final RpcRetryingCallerFactory rpcCallerFactory;
protected final BatchErrors globalErrors;
protected final ExecutorService pool;
@ -190,7 +190,7 @@ class AsyncProcess {
}
}
public AsyncProcess(HConnection hc, Configuration conf, ExecutorService pool,
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors) {
if (hc == null) {
throw new IllegalArgumentException("HConnection cannot be null.");
@ -508,7 +508,7 @@ class AsyncProcess {
protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private final Batch.Callback<CResult> callback;
private final BatchErrors errors;
private final HConnectionManager.ServerErrorTracker errorsByServer;
private final ConnectionManager.ServerErrorTracker errorsByServer;
private final ExecutorService pool;
@ -1070,7 +1070,8 @@ class AsyncProcess {
* We may benefit from connection-wide tracking of server errors.
* @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
*/
protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout, this.numTries);
protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
return new ConnectionManager.ServerErrorTracker(
this.serverTrackerTimeout, this.numTries);
}
}

View File

@ -0,0 +1,214 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
/** Internal methods on HConnection that should not be used by user code. */
@InterfaceAudience.Private
// NOTE: DO NOT make this class public. It was made package-private on purpose.
interface ClusterConnection extends HConnection {
/** @return - true if the master server is running */
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
/**
* Use this api to check if the table has been created with the specified number of
* splitkeys which was used while creating the given table.
* Note : If this api is used after a table's region gets splitted, the api may return
* false.
* @param tableName
* tableName
* @param splitKeys
* splitKeys used while creating table
* @throws IOException
* if a remote or network exception occurs
*/
boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws
IOException;
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
public HRegionLocation locateRegion(final TableName tableName,
final byte [] row) throws IOException;
/**
* Allows flushing the region cache.
*/
void clearRegionCache();
/**
* Allows flushing the region cache of all locations that pertain to
* <code>tableName</code>
* @param tableName Name of the table whose regions we are to remove from
* cache.
*/
void clearRegionCache(final TableName tableName);
/**
* Deletes cached locations for the specific region.
* @param location The location object for the region, to be purged from cache.
*/
void deleteCachedRegionLocation(final HRegionLocation location);
/**
* Find the location of the region of <i>tableName</i> that <i>row</i>
* lives in, ignoring any value that might be in the cache.
* @param tableName name of the table <i>row</i> is in
* @param row row key you're trying to find the region of
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException;
/**
* Update the location cache. This is used internally by HBase, in most cases it should not be
* used by the client application.
* @param tableName the table name
* @param rowkey the row
* @param exception the exception if any. Can be null.
* @param source the previous location
*/
void updateCachedLocations(TableName tableName, byte[] rowkey,
Object exception, ServerName source);
/**
* Gets the location of the region of <i>regionName</i>.
* @param regionName name of the region to locate
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation locateRegion(final byte[] regionName)
throws IOException;
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @return list of region locations for all regions of table
* @throws IOException
*/
List<HRegionLocation> locateRegions(final TableName tableName) throws IOException;
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
* @param tableName table to get regions of
* @param useCache Should we use the cache to retrieve the region information.
* @param offlined True if we are to include offlined regions, false and we'll leave out offlined
* regions from returned list.
* @return list of region locations for all regions of table
* @throws IOException
*/
List<HRegionLocation> locateRegions(final TableName tableName,
final boolean useCache,
final boolean offlined) throws IOException;
/**
* Returns a {@link MasterKeepAliveConnection} to the active master
*/
MasterService.BlockingInterface getMaster() throws IOException;
/**
* Establishes a connection to the region server at the specified address.
* @param serverName
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*/
AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException;
/**
* Establishes a connection to the region server at the specified address, and returns
* a region client protocol.
*
* @param serverName
* @return ClientProtocol proxy for RegionServer
* @throws IOException if a remote or network exception occurs
*
*/
ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException;
/**
* Find region location hosting passed row
* @param tableName table name
* @param row Row to find.
* @param reload If true do not use cache, otherwise bypass.
* @return Location of row.
* @throws IOException if a remote or network exception occurs
*/
HRegionLocation getRegionLocation(TableName tableName, byte [] row,
boolean reload)
throws IOException;
/**
* Clear any caches that pertain to server name <code>sn</code>.
* @param sn A server name
*/
void clearCaches(final ServerName sn);
/**
* This function allows HBaseAdmin and potentially others to get a shared MasterService
* connection.
* @return The shared instance. Never returns null.
* @throws MasterNotRunningException
*/
@Deprecated
MasterKeepAliveConnection getKeepAliveMasterService()
throws MasterNotRunningException;
/**
* @param serverName
* @return true if the server is known as dead, false otherwise.
* @deprecated internal method, do not use thru HConnection */
@Deprecated
boolean isDeadServer(ServerName serverName);
/**
* @return Nonce generator for this HConnection; may be null if disabled in configuration.
*/
public NonceGenerator getNonceGenerator();
/**
* @return Default AsyncProcess associated with this connection.
*/
AsyncProcess getAsyncProcess();
}

File diff suppressed because it is too large Load Diff

View File

@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
/**
@ -61,4 +63,33 @@ public class ConnectionUtils {
}
return newPause;
}
/**
* @param conn The connection for which to replace the generator.
* @param cnm Replaces the nonce generator used, for testing.
* @return old nonce generator.
*/
public static NonceGenerator injectNonceGeneratorForTesting(
HConnection conn, NonceGenerator cnm) {
return ConnectionManager.injectNonceGeneratorForTesting(conn, cnm);
}
/**
* Changes the configuration to set the number of retries needed when using HConnection
* internally, e.g. for updating catalog tables, etc.
* Call this method before we create any Connections.
* @param c The Configuration instance to set the retries into.
* @param log Used to log what we set in here.
*/
public static void setServerSideHConnectionRetriesConfig(
final Configuration c, final String sn, final Log log) {
int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
// Go big. Multiply by 10. If we can't get to meta after this many retries
// then something seriously wrong.
int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
int retries = hcRetries * serversideMultiplier;
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
log.debug(sn + " HConnection server-to-server retries=" + retries);
}
}

View File

@ -163,7 +163,7 @@ public class HBaseAdmin implements Abortable, Closeable {
// We use the implementation class rather then the interface because we
// need the package protected functions to get the connection to master
private HConnection connection;
private ClusterConnection connection;
private volatile Configuration conf;
private final long pause;
@ -188,7 +188,7 @@ public class HBaseAdmin implements Abortable, Closeable {
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
// Will not leak connections, as the new implementation of the constructor
// does not throw exceptions anymore.
this(HConnectionManager.getConnection(new Configuration(c)));
this(ConnectionManager.getConnectionInternal(new Configuration(c)));
this.cleanupConnectionOnClose = true;
}
@ -199,9 +199,15 @@ public class HBaseAdmin implements Abortable, Closeable {
* @param connection The HConnection instance to use
* @throws MasterNotRunningException, ZooKeeperConnectionException are not
* thrown anymore but kept into the interface for backward api compatibility
* @deprecated Do not use this internal ctor.
*/
@Deprecated
public HBaseAdmin(HConnection connection)
throws MasterNotRunningException, ZooKeeperConnectionException {
this((ClusterConnection)connection);
}
HBaseAdmin(ClusterConnection connection) {
this.conf = connection.getConfiguration();
this.connection = connection;
@ -2324,8 +2330,8 @@ public class HBaseAdmin implements Abortable, Closeable {
copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
copyOfConf.setInt("zookeeper.recovery.retry", 0);
HConnectionManager.HConnectionImplementation connection
= (HConnectionManager.HConnectionImplementation)
ConnectionManager.HConnectionImplementation connection
= (ConnectionManager.HConnectionImplementation)
HConnectionManager.getConnection(copyOfConf);
try {

View File

@ -154,7 +154,9 @@ public interface HConnection extends Abortable, Closeable {
*/
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException;
/** @return - true if the master server is running */
/** @return - true if the master server is running
* @deprecated internal method, do not use thru HConnection */
@Deprecated
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;
@ -202,7 +204,8 @@ public interface HConnection extends Abortable, Closeable {
* splitKeys used while creating table
* @throws IOException
* if a remote or network exception occurs
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws
IOException;
@ -251,7 +254,8 @@ public interface HConnection extends Abortable, Closeable {
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
public HRegionLocation locateRegion(final TableName tableName,
final byte [] row) throws IOException;
@ -261,7 +265,8 @@ public interface HConnection extends Abortable, Closeable {
/**
* Allows flushing the region cache.
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
void clearRegionCache();
/**
@ -269,7 +274,8 @@ public interface HConnection extends Abortable, Closeable {
* <code>tableName</code>
* @param tableName Name of the table whose regions we are to remove from
* cache.
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
void clearRegionCache(final TableName tableName);
@Deprecated
@ -278,7 +284,8 @@ public interface HConnection extends Abortable, Closeable {
/**
* Deletes cached locations for the specific region.
* @param location The location object for the region, to be purged from cache.
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
void deleteCachedRegionLocation(final HRegionLocation location);
/**
@ -289,7 +296,8 @@ public interface HConnection extends Abortable, Closeable {
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
HRegionLocation relocateRegion(final TableName tableName,
final byte [] row) throws IOException;
@ -308,7 +316,8 @@ public interface HConnection extends Abortable, Closeable {
* @param rowkey the row
* @param exception the exception if any. Can be null.
* @param source the previous location
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
void updateCachedLocations(TableName tableName, byte[] rowkey,
Object exception, ServerName source);
@ -322,7 +331,8 @@ public interface HConnection extends Abortable, Closeable {
* @return HRegionLocation that describes where to find the region in
* question
* @throws IOException if a remote or network exception occurs
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
HRegionLocation locateRegion(final byte[] regionName)
throws IOException;
@ -331,7 +341,8 @@ public interface HConnection extends Abortable, Closeable {
* @param tableName table to get regions of
* @return list of region locations for all regions of table
* @throws IOException
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
List<HRegionLocation> locateRegions(final TableName tableName) throws IOException;
@Deprecated
@ -345,7 +356,8 @@ public interface HConnection extends Abortable, Closeable {
* regions from returned list.
* @return list of region locations for all regions of table
* @throws IOException
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
public List<HRegionLocation> locateRegions(final TableName tableName,
final boolean useCache,
final boolean offlined) throws IOException;
@ -357,7 +369,8 @@ public interface HConnection extends Abortable, Closeable {
/**
* Returns a {@link MasterKeepAliveConnection} to the active master
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
MasterService.BlockingInterface getMaster() throws IOException;
@ -366,7 +379,8 @@ public interface HConnection extends Abortable, Closeable {
* @param serverName
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
AdminService.BlockingInterface getAdmin(final ServerName serverName) throws IOException;
/**
@ -376,8 +390,8 @@ public interface HConnection extends Abortable, Closeable {
* @param serverName
* @return ClientProtocol proxy for RegionServer
* @throws IOException if a remote or network exception occurs
*
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
ClientService.BlockingInterface getClient(final ServerName serverName) throws IOException;
/**
@ -398,7 +412,8 @@ public interface HConnection extends Abortable, Closeable {
* @param reload If true do not use cache, otherwise bypass.
* @return Location of row.
* @throws IOException if a remote or network exception occurs
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
HRegionLocation getRegionLocation(TableName tableName, byte [] row,
boolean reload)
throws IOException;
@ -460,6 +475,7 @@ public interface HConnection extends Abortable, Closeable {
public void setRegionCachePrefetch(final TableName tableName,
final boolean enable);
@Deprecated
public void setRegionCachePrefetch(final byte[] tableName,
final boolean enable);
@ -471,6 +487,7 @@ public interface HConnection extends Abortable, Closeable {
*/
boolean getRegionCachePrefetch(final TableName tableName);
@Deprecated
boolean getRegionCachePrefetch(final byte[] tableName);
/**
@ -500,7 +517,8 @@ public interface HConnection extends Abortable, Closeable {
/**
* Clear any caches that pertain to server name <code>sn</code>.
* @param sn A server name
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
void clearCaches(final ServerName sn);
/**
@ -518,16 +536,13 @@ public interface HConnection extends Abortable, Closeable {
/**
* @param serverName
* @return true if the server is known as dead, false otherwise.
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
boolean isDeadServer(ServerName serverName);
/**
* @return Nonce generator for this HConnection; may be null if disabled in configuration.
*/
* @deprecated internal method, do not use thru HConnection */
@Deprecated
public NonceGenerator getNonceGenerator();
/**
* @return Default AsyncProcess associated with this connection.
*/
AsyncProcess getAsyncProcess();
}

View File

@ -72,7 +72,8 @@ class HConnectionKey {
username = currentUser.getName();
}
} catch (IOException ioe) {
HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe);
ConnectionManager.LOG.warn(
"Error obtaining current user, skipping username in HConnectionKey", ioe);
}
}

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
@ -117,7 +118,7 @@ import com.google.protobuf.ServiceException;
@InterfaceStability.Stable
public class HTable implements HTableInterface {
private static final Log LOG = LogFactory.getLog(HTable.class);
protected HConnection connection;
protected ClusterConnection connection;
private final TableName tableName;
private volatile Configuration configuration;
protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
@ -189,7 +190,7 @@ public class HTable implements HTableInterface {
this.connection = null;
return;
}
this.connection = HConnectionManager.getConnection(conf);
this.connection = ConnectionManager.getConnectionInternal(conf);
this.configuration = conf;
this.pool = getDefaultExecutor(conf);
@ -203,12 +204,14 @@ public class HTable implements HTableInterface {
* @param tableName Name of the table.
* @param connection HConnection to be used.
* @throws IOException if a remote or network exception occurs
* @deprecated Do not use.
*/
@Deprecated
public HTable(TableName tableName, HConnection connection) throws IOException {
this.tableName = tableName;
this.cleanupPoolOnClose = true;
this.cleanupConnectionOnClose = false;
this.connection = connection;
this.connection = (ClusterConnection)connection;
this.configuration = connection.getConfiguration();
this.pool = getDefaultExecutor(this.configuration);
@ -263,7 +266,7 @@ public class HTable implements HTableInterface {
*/
public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
throws IOException {
this.connection = HConnectionManager.getConnection(conf);
this.connection = ConnectionManager.getConnectionInternal(conf);
this.configuration = conf;
this.pool = pool;
this.tableName = tableName;
@ -282,25 +285,35 @@ public class HTable implements HTableInterface {
* @param tableName Name of the table.
* @param connection HConnection to be used.
* @param pool ExecutorService to be used.
* @throws IOException if a remote or network exception occurs
* @throws IOException if a remote or network exception occurs.
* @deprecated Do not use, internal ctor.
*/
@Deprecated
public HTable(final byte[] tableName, final HConnection connection,
final ExecutorService pool) throws IOException {
this(TableName.valueOf(tableName), connection, pool);
}
/** @deprecated Do not use, internal ctor. */
@Deprecated
public HTable(TableName tableName, final HConnection connection,
final ExecutorService pool) throws IOException {
this(tableName, (ClusterConnection)connection, pool);
}
/**
* Creates an object to access a HBase table.
* Shares zookeeper connection and other resources with other HTable instances
* created with the same <code>connection</code> instance.
* Use this constructor when the ExecutorService and HConnection instance are
* externally managed.
* Visible only for HTableWrapper which is in different package.
* Should not be used by exernal code.
* @param tableName Name of the table.
* @param connection HConnection to be used.
* @param pool ExecutorService to be used.
* @throws IOException if a remote or network exception occurs
*/
public HTable(TableName tableName, final HConnection connection,
@InterfaceAudience.Private
public HTable(TableName tableName, final ClusterConnection connection,
final ExecutorService pool) throws IOException {
if (connection == null || connection.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
@ -506,6 +519,7 @@ public class HTable implements HTableInterface {
*/
// TODO(tsuna): Remove this. Unit tests shouldn't require public helpers.
@Deprecated
@VisibleForTesting
public HConnection getConnection() {
return this.connection;
}

View File

@ -76,9 +76,8 @@ public class MetaScanner {
* null if not interested in a particular table.
* @throws IOException e
*/
public static void metaScan(Configuration configuration, HConnection connection,
MetaScannerVisitor visitor, TableName userTableName)
throws IOException {
public static void metaScan(Configuration configuration, ClusterConnection connection,
MetaScannerVisitor visitor, TableName userTableName) throws IOException {
metaScan(configuration, connection, visitor, userTableName, null, Integer.MAX_VALUE,
TableName.META_TABLE_NAME);
}
@ -123,7 +122,7 @@ public class MetaScanner {
* @param metaTableName Meta table to scan, root or meta.
* @throws IOException e
*/
public static void metaScan(Configuration configuration, HConnection connection,
static void metaScan(Configuration configuration, ClusterConnection connection,
final MetaScannerVisitor visitor, final TableName tableName,
final byte[] row, final int rowLimit, final TableName metaTableName)
throws IOException {
@ -266,7 +265,7 @@ public class MetaScanner {
* @throws IOException
*/
public static NavigableMap<HRegionInfo, ServerName> allTableRegions(Configuration conf,
HConnection connection, final TableName tableName,
ClusterConnection connection, final TableName tableName,
final boolean offlined) throws IOException {
final NavigableMap<HRegionInfo, ServerName> regions =
new TreeMap<HRegionInfo, ServerName>();

View File

@ -39,14 +39,14 @@ import java.io.IOException;
class ZooKeeperKeepAliveConnection extends ZooKeeperWatcher{
ZooKeeperKeepAliveConnection(
Configuration conf, String descriptor,
HConnectionManager.HConnectionImplementation conn) throws IOException {
ConnectionManager.HConnectionImplementation conn) throws IOException {
super(conf, descriptor, conn);
}
@Override
public void close() {
if (this.abortable != null) {
((HConnectionManager.HConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
((ConnectionManager.HConnectionImplementation)abortable).releaseZooKeeperWatcher(this);
}
}

View File

@ -38,14 +38,14 @@ import org.apache.zookeeper.KeeperException;
class ZooKeeperRegistry implements Registry {
static final Log LOG = LogFactory.getLog(ZooKeeperRegistry.class);
// Needs an instance of hci to function. Set after construct this instance.
HConnectionManager.HConnectionImplementation hci;
ConnectionManager.HConnectionImplementation hci;
@Override
public void init(HConnection connection) {
if (!(connection instanceof HConnectionManager.HConnectionImplementation)) {
if (!(connection instanceof ConnectionManager.HConnectionImplementation)) {
throw new RuntimeException("This registry depends on HConnectionImplementation");
}
this.hci = (HConnectionManager.HConnectionImplementation)connection;
this.hci = (ConnectionManager.HConnectionImplementation)connection;
}
@Override

View File

@ -118,18 +118,18 @@ public class TestAsyncProcess {
}
}
public MyAsyncProcess(HConnection hc, Configuration conf) {
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
this(hc, conf, new AtomicInteger());
}
public MyAsyncProcess(HConnection hc, Configuration conf, AtomicInteger nbThreads) {
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
new RpcRetryingCallerFactory(conf), false);
}
public MyAsyncProcess(
HConnection hc, Configuration conf, boolean useGlobalErrors) {
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
new RpcRetryingCallerFactory(conf), useGlobalErrors);
@ -184,7 +184,7 @@ public class TestAsyncProcess {
/**
* Returns our async process.
*/
static class MyConnectionImpl extends HConnectionManager.HConnectionImplementation {
static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
final AtomicInteger nbThreads = new AtomicInteger(0);
final static Configuration c = new Configuration();
@ -237,7 +237,7 @@ public class TestAsyncProcess {
@Test
public void testSubmit() throws Exception {
HConnection hc = createHConnection();
ClusterConnection hc = createHConnection();
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
@ -249,7 +249,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitWithCB() throws Exception {
HConnection hc = createHConnection();
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
public void update(byte[] region, byte[] row, Object result) {
@ -269,7 +269,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitBusyRegion() throws Exception {
HConnection hc = createHConnection();
ClusterConnection hc = createHConnection();
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
@ -287,7 +287,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitBusyRegionServer() throws Exception {
HConnection hc = createHConnection();
ClusterConnection hc = createHConnection();
AsyncProcess ap = new MyAsyncProcess(hc, conf);
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
@ -462,8 +462,8 @@ public class TestAsyncProcess {
Assert.assertTrue(start + 100L + sleepTime > end);
}
private static HConnection createHConnection() throws IOException {
HConnection hc = Mockito.mock(HConnection.class);
private static ClusterConnection createHConnection() throws IOException {
ClusterConnection hc = Mockito.mock(ClusterConnection.class);
Mockito.when(hc.getRegionLocation(Mockito.eq(DUMMY_TABLE),
Mockito.eq(DUMMY_BYTES_1), Mockito.anyBoolean())).thenReturn(loc1);
@ -638,7 +638,7 @@ public class TestAsyncProcess {
public void testErrorsServers() throws IOException {
HTable ht = new HTable();
Configuration configuration = new Configuration(conf);
configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
// set default writeBufferSize
ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));

View File

@ -254,7 +254,7 @@ public class TestClientNoCluster extends Configured implements Tool {
* Override to shutdown going to zookeeper for cluster id and meta location.
*/
static class ScanOpenNextThenExceptionThenRecoverConnection
extends HConnectionManager.HConnectionImplementation {
extends ConnectionManager.HConnectionImplementation {
final ClientService.BlockingInterface stub;
ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
@ -286,7 +286,7 @@ public class TestClientNoCluster extends Configured implements Tool {
* Override to shutdown going to zookeeper for cluster id and meta location.
*/
static class RegionServerStoppedOnScannerOpenConnection
extends HConnectionManager.HConnectionImplementation {
extends ConnectionManager.HConnectionImplementation {
final ClientService.BlockingInterface stub;
RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
@ -318,7 +318,7 @@ public class TestClientNoCluster extends Configured implements Tool {
* Override to check we are setting rpc timeout right.
*/
static class RpcTimeoutConnection
extends HConnectionManager.HConnectionImplementation {
extends ConnectionManager.HConnectionImplementation {
final ClientService.BlockingInterface stub;
RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
@ -345,7 +345,7 @@ public class TestClientNoCluster extends Configured implements Tool {
* Fake many regionservers and many regions on a connection implementation.
*/
static class ManyServersManyRegionsConnection
extends HConnectionManager.HConnectionImplementation {
extends ConnectionManager.HConnectionImplementation {
// All access should be synchronized
final Map<ServerName, ClientService.BlockingInterface> serversByClient;

View File

@ -71,8 +71,8 @@ public class TestSnapshotFromAdmin {
+ "- further testing won't prove anything.", time < ignoreExpectedTime);
// setup the mocks
HConnectionManager.HConnectionImplementation mockConnection = Mockito
.mock(HConnectionManager.HConnectionImplementation.class);
ConnectionManager.HConnectionImplementation mockConnection = Mockito
.mock(ConnectionManager.HConnectionImplementation.class);
Configuration conf = HBaseConfiguration.create();
// setup the conf to match the expected properties
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
@ -118,8 +118,8 @@ public class TestSnapshotFromAdmin {
*/
@Test
public void testValidateSnapshotName() throws Exception {
HConnectionManager.HConnectionImplementation mockConnection = Mockito
.mock(HConnectionManager.HConnectionImplementation.class);
ConnectionManager.HConnectionImplementation mockConnection = Mockito
.mock(ConnectionManager.HConnectionImplementation.class);
Configuration conf = HBaseConfiguration.create();
Mockito.when(mockConnection.getConfiguration()).thenReturn(conf);
HBaseAdmin admin = new HBaseAdmin(mockConnection);

View File

@ -62,8 +62,8 @@ import com.google.protobuf.ServiceException;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CoprocessorHConnection implements HConnection {
private static final NonceGenerator ng = new HConnectionManager.NoNonceGenerator();
public class CoprocessorHConnection implements ClusterConnection {
private static final NonceGenerator ng = new ConnectionManager.NoNonceGenerator();
/**
* Create an unmanaged {@link HConnection} based on the environment in which we are running the
@ -73,9 +73,10 @@ public class CoprocessorHConnection implements HConnection {
* @return an unmanaged {@link HConnection}.
* @throws IOException if we cannot create the basic connection
*/
public static HConnection getConnectionForEnvironment(CoprocessorEnvironment env)
public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
throws IOException {
HConnection connection = HConnectionManager.createConnection(env.getConfiguration());
ClusterConnection connection =
ConnectionManager.createConnectionInternal(env.getConfiguration());
// this bit is a little hacky - just trying to get it going for the moment
if (env instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
@ -87,11 +88,11 @@ public class CoprocessorHConnection implements HConnection {
return connection;
}
private HConnection delegate;
private ClusterConnection delegate;
private ServerName serverName;
private HRegionServer server;
public CoprocessorHConnection(HConnection delegate, HRegionServer server) {
public CoprocessorHConnection(ClusterConnection delegate, HRegionServer server) {
this.server = server;
this.serverName = server.getServerName();
this.delegate = delegate;

View File

@ -0,0 +1,331 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.io.MultipleIOException;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
* A wrapper for HTable. Can be used to restrict privilege.
*
* Currently it just helps to track tables opened by a Coprocessor and
* facilitate close of them if it is aborted.
*
* We also disallow row locking.
*
* There is nothing now that will stop a coprocessor from using HTable
* objects directly instead of this API, but in the future we intend to
* analyze coprocessor implementations as they are loaded and reject those
* which attempt to use objects and methods outside the Environment
* sandbox.
*/
public class HTableWrapper implements HTableInterface {
private TableName tableName;
private HTable table;
private ClusterConnection connection;
private final List<HTableInterface> openTables;
/**
* @param openTables External list of tables used for tracking wrappers.
* @throws IOException
*/
public static HTableInterface createWrapper(List<HTableInterface> openTables,
TableName tableName, Environment env, ExecutorService pool) throws IOException {
return new HTableWrapper(openTables, tableName,
CoprocessorHConnection.getConnectionForEnvironment(env), pool);
}
private HTableWrapper(List<HTableInterface> openTables, TableName tableName,
ClusterConnection connection, ExecutorService pool)
throws IOException {
this.tableName = tableName;
this.table = new HTable(tableName, connection, pool);
this.connection = connection;
this.openTables = openTables;
this.openTables.add(this);
}
public void internalClose() throws IOException {
List<IOException> exceptions = new ArrayList<IOException>(2);
try {
table.close();
} catch (IOException e) {
exceptions.add(e);
}
try {
// have to self-manage our connection, as per the HTable contract
if (this.connection != null) {
this.connection.close();
}
} catch (IOException e) {
exceptions.add(e);
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
public Configuration getConfiguration() {
return table.getConfiguration();
}
public void close() throws IOException {
try {
internalClose();
} finally {
openTables.remove(this);
}
}
public Result getRowOrBefore(byte[] row, byte[] family)
throws IOException {
return table.getRowOrBefore(row, family);
}
public Result get(Get get) throws IOException {
return table.get(get);
}
public boolean exists(Get get) throws IOException {
return table.exists(get);
}
public Boolean[] exists(List<Get> gets) throws IOException{
return table.exists(gets);
}
public void put(Put put) throws IOException {
table.put(put);
}
public void put(List<Put> puts) throws IOException {
table.put(puts);
}
public void delete(Delete delete) throws IOException {
table.delete(delete);
}
public void delete(List<Delete> deletes) throws IOException {
table.delete(deletes);
}
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException {
return table.checkAndPut(row, family, qualifier, value, put);
}
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Put put) throws IOException {
return table.checkAndPut(row, family, qualifier, compareOp, value, put);
}
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Delete delete) throws IOException {
return table.checkAndDelete(row, family, qualifier, value, delete);
}
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete) throws IOException {
return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
}
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount) throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount);
}
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount, Durability durability)
throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount,
durability);
}
@Override
public Result append(Append append) throws IOException {
return table.append(append);
}
@Override
public Result increment(Increment increment) throws IOException {
return table.increment(increment);
}
public void flushCommits() throws IOException {
table.flushCommits();
}
public boolean isAutoFlush() {
return table.isAutoFlush();
}
public ResultScanner getScanner(Scan scan) throws IOException {
return table.getScanner(scan);
}
public ResultScanner getScanner(byte[] family) throws IOException {
return table.getScanner(family);
}
public ResultScanner getScanner(byte[] family, byte[] qualifier)
throws IOException {
return table.getScanner(family, qualifier);
}
public HTableDescriptor getTableDescriptor() throws IOException {
return table.getTableDescriptor();
}
@Override
public byte[] getTableName() {
return tableName.getName();
}
@Override
public TableName getName() {
return table.getName();
}
@Override
public void batch(List<? extends Row> actions, Object[] results)
throws IOException, InterruptedException {
table.batch(actions, results);
}
/**
* {@inheritDoc}
* @deprecated If any exception is thrown by one of the actions, there is no way to
* retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
*/
@Override
public Object[] batch(List<? extends Row> actions)
throws IOException, InterruptedException {
return table.batch(actions);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException {
table.batchCallback(actions, results, callback);
}
/**
* {@inheritDoc}
* @deprecated If any exception is thrown by one of the actions, there is no way to
* retrieve the partially executed results. Use
* {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
* instead.
*/
@Override
public <R> Object[] batchCallback(List<? extends Row> actions,
Batch.Callback<R> callback) throws IOException, InterruptedException {
return table.batchCallback(actions, callback);
}
@Override
public Result[] get(List<Get> gets) throws IOException {
return table.get(gets);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return table.coprocessorService(row);
}
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
throws ServiceException, Throwable {
return table.coprocessorService(service, startKey, endKey, callable);
}
@Override
public <T extends Service, R> void coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
throws ServiceException, Throwable {
table.coprocessorService(service, startKey, endKey, callable, callback);
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
table.mutateRow(rm);
}
@Override
public void setAutoFlush(boolean autoFlush) {
table.setAutoFlush(autoFlush, autoFlush);
}
@Override
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
table.setAutoFlush(autoFlush, clearBufferOnFail);
}
@Override
public void setAutoFlushTo(boolean autoFlush) {
table.setAutoFlushTo(autoFlush);
}
@Override
public long getWriteBufferSize() {
return table.getWriteBufferSize();
}
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
table.setWriteBufferSize(writeBufferSize);
}
@Override
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
}
}

View File

@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@ -44,34 +43,17 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CoprocessorHConnection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.client.HTableWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.io.MultipleIOException;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
* Provides the common setup framework and runtime services for coprocessor
@ -357,273 +339,6 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
*/
public static class Environment implements CoprocessorEnvironment {
/**
* A wrapper for HTable. Can be used to restrict privilege.
*
* Currently it just helps to track tables opened by a Coprocessor and
* facilitate close of them if it is aborted.
*
* We also disallow row locking.
*
* There is nothing now that will stop a coprocessor from using HTable
* objects directly instead of this API, but in the future we intend to
* analyze coprocessor implementations as they are loaded and reject those
* which attempt to use objects and methods outside the Environment
* sandbox.
*/
class HTableWrapper implements HTableInterface {
private TableName tableName;
private HTable table;
private HConnection connection;
public HTableWrapper(TableName tableName, HConnection connection, ExecutorService pool)
throws IOException {
this.tableName = tableName;
this.table = new HTable(tableName, connection, pool);
this.connection = connection;
openTables.add(this);
}
void internalClose() throws IOException {
List<IOException> exceptions = new ArrayList<IOException>(2);
try {
table.close();
} catch (IOException e) {
exceptions.add(e);
}
try {
// have to self-manage our connection, as per the HTable contract
if (this.connection != null) {
this.connection.close();
}
} catch (IOException e) {
exceptions.add(e);
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
public Configuration getConfiguration() {
return table.getConfiguration();
}
public void close() throws IOException {
try {
internalClose();
} finally {
openTables.remove(this);
}
}
public Result getRowOrBefore(byte[] row, byte[] family)
throws IOException {
return table.getRowOrBefore(row, family);
}
public Result get(Get get) throws IOException {
return table.get(get);
}
public boolean exists(Get get) throws IOException {
return table.exists(get);
}
public Boolean[] exists(List<Get> gets) throws IOException{
return table.exists(gets);
}
public void put(Put put) throws IOException {
table.put(put);
}
public void put(List<Put> puts) throws IOException {
table.put(puts);
}
public void delete(Delete delete) throws IOException {
table.delete(delete);
}
public void delete(List<Delete> deletes) throws IOException {
table.delete(deletes);
}
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException {
return table.checkAndPut(row, family, qualifier, value, put);
}
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Put put) throws IOException {
return table.checkAndPut(row, family, qualifier, compareOp, value, put);
}
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Delete delete) throws IOException {
return table.checkAndDelete(row, family, qualifier, value, delete);
}
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete) throws IOException {
return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
}
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount) throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount);
}
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount, Durability durability)
throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount,
durability);
}
@Override
public Result append(Append append) throws IOException {
return table.append(append);
}
@Override
public Result increment(Increment increment) throws IOException {
return table.increment(increment);
}
public void flushCommits() throws IOException {
table.flushCommits();
}
public boolean isAutoFlush() {
return table.isAutoFlush();
}
public ResultScanner getScanner(Scan scan) throws IOException {
return table.getScanner(scan);
}
public ResultScanner getScanner(byte[] family) throws IOException {
return table.getScanner(family);
}
public ResultScanner getScanner(byte[] family, byte[] qualifier)
throws IOException {
return table.getScanner(family, qualifier);
}
public HTableDescriptor getTableDescriptor() throws IOException {
return table.getTableDescriptor();
}
@Override
public byte[] getTableName() {
return tableName.getName();
}
@Override
public TableName getName() {
return table.getName();
}
@Override
public void batch(List<? extends Row> actions, Object[] results)
throws IOException, InterruptedException {
table.batch(actions, results);
}
/**
* {@inheritDoc}
* @deprecated If any exception is thrown by one of the actions, there is no way to
* retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
*/
@Override
public Object[] batch(List<? extends Row> actions)
throws IOException, InterruptedException {
return table.batch(actions);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException {
table.batchCallback(actions, results, callback);
}
/**
* {@inheritDoc}
* @deprecated If any exception is thrown by one of the actions, there is no way to
* retrieve the partially executed results. Use
* {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
* instead.
*/
@Override
public <R> Object[] batchCallback(List<? extends Row> actions,
Batch.Callback<R> callback) throws IOException, InterruptedException {
return table.batchCallback(actions, callback);
}
@Override
public Result[] get(List<Get> gets) throws IOException {
return table.get(gets);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return table.coprocessorService(row);
}
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
throws ServiceException, Throwable {
return table.coprocessorService(service, startKey, endKey, callable);
}
@Override
public <T extends Service, R> void coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
throws ServiceException, Throwable {
table.coprocessorService(service, startKey, endKey, callable, callback);
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
table.mutateRow(rm);
}
@Override
public void setAutoFlush(boolean autoFlush) {
table.setAutoFlush(autoFlush, autoFlush);
}
@Override
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
table.setAutoFlush(autoFlush, clearBufferOnFail);
}
@Override
public void setAutoFlushTo(boolean autoFlush) {
table.setAutoFlushTo(autoFlush);
}
@Override
public long getWriteBufferSize() {
return table.getWriteBufferSize();
}
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
table.setWriteBufferSize(writeBufferSize);
}
@Override
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
}
}
/** The coprocessor */
public Coprocessor impl;
/** Chaining priority */
@ -757,8 +472,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
*/
@Override
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this),
pool);
return HTableWrapper.createWrapper(openTables, tableName, this, pool);
}
}

View File

@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
@ -442,7 +442,7 @@ MasterServices, Server {
}
String name = "master/" + initialIsa.toString();
// Set how many times to retry talking to another server over HConnection.
HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
ConnectionUtils.setServerSideHConnectionRetriesConfig(this.conf, name, LOG);
int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));
this.rpcServer = new RpcServer(this, name, getServices(),

View File

@ -84,9 +84,9 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@ -581,7 +581,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
this.rand = new Random(initialIsa.hashCode());
String name = "regionserver/" + initialIsa.toString();
// Set how many times to retry talking to another server over HConnection.
HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
ConnectionUtils.setServerSideHConnectionRetriesConfig(this.conf, name, LOG);
this.priority = new AnnotationReadingPriorityFunction(this);
RpcSchedulerFactory rpcSchedulerFactory;
try {

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
import org.mockito.Mockito;
/**
@ -50,16 +50,16 @@ public class HConnectionTestingUtility {
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static HConnection getMockedConnection(final Configuration conf)
public static ClusterConnection getMockedConnection(final Configuration conf)
throws ZooKeeperConnectionException {
HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (HConnectionManager.CONNECTION_INSTANCES) {
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
HConnectionImplementation connection =
HConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
if (connection == null) {
connection = Mockito.mock(HConnectionImplementation.class);
Mockito.when(connection.getConfiguration()).thenReturn(conf);
HConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
}
return connection;
}
@ -93,12 +93,12 @@ public class HConnectionTestingUtility {
* when done with this mocked Connection.
* @throws IOException
*/
public static HConnection getMockedConnectionAndDecorate(final Configuration conf,
public static ClusterConnection getMockedConnectionAndDecorate(final Configuration conf,
final AdminProtos.AdminService.BlockingInterface admin,
final ClientProtos.ClientService.BlockingInterface client,
final ServerName sn, final HRegionInfo hri)
throws IOException {
HConnection c = HConnectionTestingUtility.getMockedConnection(conf);
ClusterConnection c = HConnectionTestingUtility.getMockedConnection(conf);
Mockito.doNothing().when(c).close();
// Make it so we return a particular location when asked.
final HRegionLocation loc = new HRegionLocation(hri, sn);
@ -139,12 +139,12 @@ public class HConnectionTestingUtility {
public static HConnection getSpiedConnection(final Configuration conf)
throws IOException {
HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (HConnectionManager.CONNECTION_INSTANCES) {
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
HConnectionImplementation connection =
HConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
if (connection == null) {
connection = Mockito.spy(new HConnectionImplementation(conf, true));
HConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
}
return connection;
}
@ -154,8 +154,8 @@ public class HConnectionTestingUtility {
* @return Count of extant connection instances
*/
public static int getConnectionCount() {
synchronized (HConnectionManager.CONNECTION_INSTANCES) {
return HConnectionManager.CONNECTION_INSTANCES.size();
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
return ConnectionManager.CONNECTION_INSTANCES.size();
}
}
}

View File

@ -102,7 +102,8 @@ public class TestClientTimeouts {
assertFalse(connection == lastConnection);
lastConnection = connection;
// Override the connection's rpc client for timeout testing
((HConnectionManager.HConnectionImplementation)connection).setRpcClient(rpcClient);
((ConnectionManager.HConnectionImplementation)connection).setRpcClient(
rpcClient);
// run some admin commands
HBaseAdmin.checkHBaseAvailable(conf);
admin.setBalancerRunning(false, false);

View File

@ -241,8 +241,8 @@ public class TestFromClientSide {
z0.close();
// Then a ZooKeeperKeepAliveConnection
HConnectionManager.HConnectionImplementation connection1 =
(HConnectionManager.HConnectionImplementation)
ConnectionManager.HConnectionImplementation connection1 =
(ConnectionManager.HConnectionImplementation)
HConnectionManager.getConnection(newConfig);
ZooKeeperKeepAliveConnection z1 = connection1.getKeepAliveZooKeeperWatcher();
@ -263,8 +263,8 @@ public class TestFromClientSide {
Configuration newConfig2 = new Configuration(TEST_UTIL.getConfiguration());
newConfig2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "6789");
HConnectionManager.HConnectionImplementation connection2 =
(HConnectionManager.HConnectionImplementation)
ConnectionManager.HConnectionImplementation connection2 =
(ConnectionManager.HConnectionImplementation)
HConnectionManager.getConnection(newConfig2);
assertTrue("connections should be different ", connection1 != connection2);
@ -275,7 +275,7 @@ public class TestFromClientSide {
" on different connections", z1 != z3);
// Bypass the private access
Method m = HConnectionManager.HConnectionImplementation.class.
Method m = ConnectionManager.HConnectionImplementation.class.
getDeclaredMethod("closeZooKeeperWatcher");
m.setAccessible(true);
m.invoke(connection2);
@ -338,7 +338,7 @@ public class TestFromClientSide {
TEST_UTIL.countRows(table);
table.getConnection().clearRegionCache();
assertEquals("Clearing cache should have 0 cached ", 0,
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
ConnectionManager.getCachedRegionCount(conf, TABLENAME));
// A Get is suppose to do a region lookup request
Get g = new Get(Bytes.toBytes("aaa"));
@ -346,7 +346,7 @@ public class TestFromClientSide {
// only one region should be cached if the cache prefetch is disabled.
assertEquals("Number of cached region is incorrect ", 1,
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
ConnectionManager.getCachedRegionCount(conf, TABLENAME));
// now we enable cached prefetch.
HTable.setRegionCachePrefetch(conf, TABLENAME, true);
@ -364,7 +364,7 @@ public class TestFromClientSide {
table.getConnection().clearRegionCache();
assertEquals("Number of cached region is incorrect ", 0,
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
ConnectionManager.getCachedRegionCount(conf, TABLENAME));
// if there is a cache miss, some additional regions should be prefetched.
Get g2 = new Get(Bytes.toBytes("bbb"));
@ -376,14 +376,14 @@ public class TestFromClientSide {
// the total number of cached regions == region('aaa") + prefeched regions.
LOG.info("Testing how many regions cached");
assertEquals("Number of cached region is incorrect ", prefetchRegionNumber,
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
ConnectionManager.getCachedRegionCount(conf, TABLENAME));
table.getConnection().clearRegionCache();
Get g3 = new Get(Bytes.toBytes("abc"));
table.get(g3);
assertEquals("Number of cached region is incorrect ", prefetchRegionNumber,
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
ConnectionManager.getCachedRegionCount(conf, TABLENAME));
LOG.info("Finishing testRegionCachePreWarm");
}

View File

@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter;
@ -320,9 +320,9 @@ public class TestHCM {
// Save off current HConnections
Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
new HashMap<HConnectionKey, HConnectionImplementation>();
oldHBaseInstances.putAll(HConnectionManager.CONNECTION_INSTANCES);
oldHBaseInstances.putAll(ConnectionManager.CONNECTION_INSTANCES);
HConnectionManager.CONNECTION_INSTANCES.clear();
ConnectionManager.CONNECTION_INSTANCES.clear();
try {
HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
@ -332,8 +332,8 @@ public class TestHCM {
HConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
} finally {
// Put original HConnections back
HConnectionManager.CONNECTION_INSTANCES.clear();
HConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances);
ConnectionManager.CONNECTION_INSTANCES.clear();
ConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances);
}
}
@ -354,8 +354,8 @@ public class TestHCM {
Put put = new Put(ROW);
put.add(FAM_NAM, ROW, ROW);
table.put(put);
HConnectionManager.HConnectionImplementation conn =
(HConnectionManager.HConnectionImplementation)table.getConnection();
ConnectionManager.HConnectionImplementation conn =
(ConnectionManager.HConnectionImplementation)table.getConnection();
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
@ -455,7 +455,7 @@ public class TestHCM {
Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
// Check that we unserialized the exception as expected
Throwable cause = HConnectionManager.findException(e.getCause(0));
Throwable cause = ConnectionManager.findException(e.getCause(0));
Assert.assertNotNull(cause);
Assert.assertTrue(cause instanceof RegionMovedException);
}
@ -548,8 +548,8 @@ public class TestHCM {
Put put = new Put(ROW);
put.add(FAM_NAM, ROW, ROW);
table.put(put);
HConnectionManager.HConnectionImplementation conn =
(HConnectionManager.HConnectionImplementation)table.getConnection();
ConnectionManager.HConnectionImplementation conn =
(ConnectionManager.HConnectionImplementation)table.getConnection();
HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW);
assertNotNull(location);
@ -616,7 +616,7 @@ public class TestHCM {
/**
* Makes sure that there is no leaking of
* {@link HConnectionManager.HConnectionImplementation} in the {@link HConnectionManager}
* {@link ConnectionManager.HConnectionImplementation} in the {@link HConnectionManager}
* class.
*/
@Test
@ -756,8 +756,8 @@ public class TestHCM {
public void testMulti() throws Exception {
HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
TEST_UTIL.createMultiRegions(table, FAM_NAM);
HConnectionManager.HConnectionImplementation conn =
(HConnectionManager.HConnectionImplementation)
ConnectionManager.HConnectionImplementation conn =
(ConnectionManager.HConnectionImplementation)
HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
// We're now going to move the region and check that it works for the client
@ -875,8 +875,8 @@ public class TestHCM {
try {
long timeBase = timeMachine.currentTimeMillis();
long largeAmountOfTime = ANY_PAUSE * 1000;
HConnectionManager.ServerErrorTracker tracker =
new HConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
ConnectionManager.ServerErrorTracker tracker =
new ConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
// The default backoff is 0.
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
@ -976,9 +976,9 @@ public class TestHCM {
for (int i = 0; i < 30; i++) {
HConnection c1 = null;
try {
c1 = HConnectionManager.getConnection(config);
c1 = ConnectionManager.getConnectionInternal(config);
LOG.info("HTable connection " + i + " " + c1);
HTable table = new HTable(TABLE_NAME4, c1, pool);
HTable table = new HTable(config, TABLE_NAME4, pool);
table.close();
LOG.info("HTable connection " + i + " closed " + c1);
} catch (Exception e) {

View File

@ -507,7 +507,7 @@ public class TestMultiParallel {
}
};
NonceGenerator oldCnm =
HConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), cnm);
ConnectionUtils.injectNonceGeneratorForTesting(table.getConnection(), cnm);
// First test sequential requests.
try {
@ -570,7 +570,7 @@ public class TestMultiParallel {
validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
table.close();
} finally {
HConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), oldCnm);
ConnectionManager.injectNonceGeneratorForTesting(table.getConnection(), oldCnm);
}
}

View File

@ -57,7 +57,7 @@ import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
/**
* Tests class {@link org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment.HTableWrapper}
* Tests class {@link org.apache.hadoop.hbase.client.HTableWrapper}
* by invoking its methods and briefly asserting the result is reasonable.
*/
@Category(MediumTests.class)

View File

@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.NonceGenerator;
@ -322,7 +322,7 @@ public class TestDistributedLogSplitting {
HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
NonceGenerator oldNg =
HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), ng);
ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), ng);
try {
List<Increment> reqs = new ArrayList<Increment>();
@ -356,7 +356,7 @@ public class TestDistributedLogSplitting {
}
}
} finally {
HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), oldNg);
ConnectionUtils.injectNonceGeneratorForTesting(ht.getConnection(), oldNg);
ht.close();
zkw.close();
}