HBASE-6580 Deprecate HTablePool in favor of HConnection.getTable(...)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1511543 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2013-08-07 23:58:05 +00:00
parent d3ddc7cc0b
commit d363e852e6
8 changed files with 302 additions and 32 deletions

View File

@ -70,6 +70,60 @@ public interface HConnection extends Abortable, Closeable {
*/
Configuration getConfiguration();
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
* (created with {@link HConnectionManager#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(String tableName) throws IOException;
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
* (created with {@link HConnectionManager#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(byte[] tableName) throws IOException;
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
* (created with {@link HConnectionManager#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException;
/**
* Retrieve an HTableInterface implementation for access to a table.
* The returned HTableInterface is not thread safe, a new instance should
* be created for each using thread.
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
* (created with {@link HConnectionManager#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
*/
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException;
/** @return - true if the master server is running */
boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException;

View File

@ -36,6 +36,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -137,6 +140,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -148,8 +152,22 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* A non-instantiable class that manages {@link HConnection}s.
* This class has a static Map of {@link HConnection} instances keyed by
* A non-instantiable class that manages creation of {@link HConnection}s.
* <p>The simplest way to use this class is by using {@link #createConnection(Configuration)}.
* This creates a new {@link HConnection} that is managed by the caller.
* From this {@link HConnection} {@link HTableInterface} implementations are retrieved
* with {@link HConnection#getTable(byte[])}. Example:
* <pre>
* {@code
* HConnection connection = HConnectionManager.createConnection(config);
* HTableInterface table = connection.getTable("table1");
* // use the table as needed, for a single operation and a single thread
* table.close();
* connection.close();
* }
* </pre>
* <p>The following logic and API will be removed in the future:
* <p>This class has a static Map of {@link HConnection} instances keyed by
* {@link Configuration}; all invocations of {@link #getConnection(Configuration)}
* that pass the same {@link Configuration} instance will be returned the same
* {@link HConnection} instance (Adding properties to a Configuration
@ -241,6 +259,7 @@ public class HConnectionManager {
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
@Deprecated
@SuppressWarnings("resource")
public static HConnection getConnection(final Configuration conf)
throws IOException {
@ -263,18 +282,61 @@ public class HConnectionManager {
/**
* Create a new HConnection instance using the passed <code>conf</code> instance.
* <p>Note: This bypasses the usual HConnection life cycle management done by
* {@link #getConnection(Configuration)}. Use this with caution, the caller is responsible for
* {@link #getConnection(Configuration)}. The caller is responsible for
* calling {@link HConnection#close()} on the returned connection instance.
*
* This is the recommended way to create HConnections.
* {@code
* HConnection connection = HConnectionManager.createConnection(conf);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
* table.close();
* connection.close();
* }
*
* @param conf configuration
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static HConnection createConnection(Configuration conf)
throws IOException {
return createConnection(conf, false);
return createConnection(conf, false, null);
}
/**
* Create a new HConnection instance using the passed <code>conf</code> instance.
* <p>Note: This bypasses the usual HConnection life cycle management done by
* {@link #getConnection(Configuration)}. The caller is responsible for
* calling {@link HConnection#close()} on the returned connection instance.
* This is the recommended way to create HConnections.
* {@code
* ExecutorService pool = ...;
* HConnection connection = HConnectionManager.createConnection(conf, pool);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
* table.close();
* connection.close();
* }
* @param conf configuration
* @param pool the thread pool to use for batch operation in HTables used via this HConnection
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static HConnection createConnection(Configuration conf, ExecutorService pool)
throws IOException {
return createConnection(conf, false, pool);
}
@Deprecated
static HConnection createConnection(final Configuration conf, final boolean managed)
throws IOException {
return createConnection(conf, managed, null);
}
@Deprecated
static HConnection createConnection(final Configuration conf, final boolean managed, final ExecutorService pool)
throws IOException {
String className = conf.get("hbase.client.connection.impl",
HConnectionManager.HConnectionImplementation.class.getName());
@ -287,9 +349,9 @@ public class HConnectionManager {
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor =
clazz.getDeclaredConstructor(Configuration.class, boolean.class);
clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class);
constructor.setAccessible(true);
return (HConnection) constructor.newInstance(conf, managed);
return (HConnection) constructor.newInstance(conf, managed, pool);
} catch (Exception e) {
throw new IOException(e);
}
@ -301,6 +363,7 @@ public class HConnectionManager {
* then close connection to the zookeeper ensemble and let go of all associated resources.
*
* @param conf configuration whose identity is used to find {@link HConnection} instance.
* @deprecated
*/
public static void deleteConnection(Configuration conf) {
deleteConnection(new HConnectionKey(conf), false);
@ -311,6 +374,7 @@ public class HConnectionManager {
* This will then close connection to the zookeeper ensemble and let go of all resources.
*
* @param connection
* @deprecated
*/
public static void deleteStaleConnection(HConnection connection) {
deleteConnection(connection, true);
@ -320,6 +384,7 @@ public class HConnectionManager {
* Delete information for all connections. Close or not the connection, depending on the
* staleConnection boolean and the ref count. By default, you should use it with
* staleConnection to true.
* @deprecated
*/
public static void deleteAllConnections(boolean staleConnection) {
synchronized (CONNECTION_INSTANCES) {
@ -342,6 +407,7 @@ public class HConnectionManager {
}
@Deprecated
private static void deleteConnection(HConnection connection, boolean staleConnection) {
synchronized (CONNECTION_INSTANCES) {
for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
@ -353,6 +419,7 @@ public class HConnectionManager {
}
}
@Deprecated
private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
synchronized (CONNECTION_INSTANCES) {
HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
@ -464,6 +531,10 @@ public class HConnectionManager {
private final DelayedClosing delayedClosing =
DelayedClosing.createAndStart(this);
// thread executor shared by all HTableInterface instances created
// by this connection
private volatile ExecutorService batchPool = null;
private volatile boolean cleanupPool = false;
private final Configuration conf;
@ -499,6 +570,10 @@ public class HConnectionManager {
*/
Registry registry;
HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
this(conf, managed, null);
}
/**
* constructor
* @param conf Configuration object
@ -510,8 +585,9 @@ public class HConnectionManager {
* are shared, we have reference counting going on and will only do full cleanup when no more
* users of an HConnectionImplementation instance.
*/
HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool) throws IOException {
this(conf);
this.batchPool = pool;
this.managed = managed;
this.registry = setupRegistry();
retrieveClusterId();
@ -556,6 +632,74 @@ public class HConnectionManager {
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
}
@Override
public HTableInterface getTable(String tableName) throws IOException {
return getTable(Bytes.toBytes(tableName));
}
@Override
public HTableInterface getTable(byte[] tableName) throws IOException {
return getTable(tableName, getBatchPool());
}
@Override
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
return getTable(Bytes.toBytes(tableName), pool);
}
@Override
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
if (managed) {
throw new IOException("The connection has to be unmanaged.");
}
return new HTable(tableName, this, pool);
}
private ExecutorService getBatchPool() {
if (batchPool == null) {
// shared HTable thread executor not yet initialized
synchronized (this) {
if (batchPool == null) {
int maxThreads = conf.getInt("hbase.hconnection.threads.max",
Integer.MAX_VALUE);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors();
}
long keepAliveTime = conf.getLong(
"hbase.hconnection.threads.keepalivetime", 60);
this.batchPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Threads.newDaemonThreadFactory("hbase-connection-shared-executor"));
((ThreadPoolExecutor) this.batchPool)
.allowCoreThreadTimeOut(true);
}
this.cleanupPool = true;
}
}
return this.batchPool;
}
protected ExecutorService getCurrentBatchPool() {
return batchPool;
}
private void shutdownBatchPool() {
if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
this.batchPool.shutdown();
try {
if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
this.batchPool.shutdownNow();
}
} catch (InterruptedException e) {
this.batchPool.shutdownNow();
}
}
}
/**
* @return The cluster registry implementation to use.
* @throws IOException
@ -2267,6 +2411,7 @@ public class HConnectionManager {
}
delayedClosing.stop("Closing connection");
closeMaster();
shutdownBatchPool();
this.closed = true;
closeZooKeeperWatcher();
this.stubs.clear();

View File

@ -59,6 +59,26 @@ public class HConnectionWrapper implements HConnection {
this.ugi = ugi;
}
@Override
public HTableInterface getTable(String tableName) throws IOException {
return hconnection.getTable(tableName);
}
@Override
public HTableInterface getTable(byte[] tableName) throws IOException {
return hconnection.getTable(tableName);
}
@Override
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
return hconnection.getTable(tableName, pool);
}
@Override
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
return hconnection.getTable(tableName, pool);
}
@Override
public void abort(String why, Throwable e) {
hconnection.abort(why, e);

View File

@ -59,6 +59,7 @@ import java.util.Map;
* <p>
* Pool will manage its own connections to the cluster. See
* {@link HConnectionManager}.
* @deprecated Use {@link HConnection#getTable(String)} instead.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -212,7 +213,7 @@ public class TestClientNoCluster {
final ClientService.BlockingInterface stub;
ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
boolean managed) throws IOException {
boolean managed, ExecutorService pool) throws IOException {
super(conf, managed);
// Mock up my stub so open scanner returns a scanner id and then on next, we throw
// exceptions for three times and then after that, we return no more to scan.
@ -243,8 +244,8 @@ public class TestClientNoCluster {
extends HConnectionManager.HConnectionImplementation {
final ClientService.BlockingInterface stub;
RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed)
throws IOException {
RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
ExecutorService pool) throws IOException {
super(conf, managed);
// Mock up my stub so open scanner returns a scanner id and then on next, we throw
// exceptions for three times and then after that, we return no more to scan.
@ -275,7 +276,7 @@ public class TestClientNoCluster {
extends HConnectionManager.HConnectionImplementation {
final ClientService.BlockingInterface stub;
RpcTimeoutConnection(Configuration conf, boolean managed)
RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool)
throws IOException {
super(conf, managed);
// Mock up my stub so an exists call -- which turns into a get -- throws an exception

View File

@ -4100,12 +4100,7 @@ public class TestFromClientSide {
HTable createUnmangedHConnectionHTable(final byte [] tableName) throws IOException {
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
ExecutorService pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Threads.newDaemonThreadFactory("test-from-client"));
((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
return new HTable(tableName, conn, pool);
return (HTable)conn.getTable(tableName);
}
/**

View File

@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -110,6 +111,60 @@ public class TestHCM {
return HConnectionTestingUtility.getConnectionCount();
}
@Test
public void testClusterConnection() throws IOException {
ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
5, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Threads.newDaemonThreadFactory("test-hcm"));
HConnection con1 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
HConnection con2 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
// make sure the internally created ExecutorService is the one passed
assertTrue(otherPool == ((HConnectionImplementation)con2).getCurrentBatchPool());
String tableName = "testClusterConnection";
TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
HTable t = (HTable)con1.getTable(tableName, otherPool);
// make sure passing a pool to the getTable does not trigger creation of an internal pool
assertNull("Internal Thread pool should be null", ((HConnectionImplementation)con1).getCurrentBatchPool());
// table should use the pool passed
assertTrue(otherPool == t.getPool());
t.close();
t = (HTable)con2.getTable(tableName);
// table should use the connectin's internal pool
assertTrue(otherPool == t.getPool());
t.close();
t = (HTable)con2.getTable(Bytes.toBytes(tableName));
// try other API too
assertTrue(otherPool == t.getPool());
t.close();
t = (HTable)con1.getTable(tableName);
ExecutorService pool = ((HConnectionImplementation)con1).getCurrentBatchPool();
// make sure an internal pool was created
assertNotNull("An internal Thread pool should have been created", pool);
// and that the table is using it
assertTrue(t.getPool() == pool);
t.close();
t = (HTable)con1.getTable(tableName);
// still using the *same* internal pool
assertTrue(t.getPool() == pool);
t.close();
con1.close();
// if the pool was created on demand it should be closed upon connectin close
assertTrue(pool.isShutdown());
con2.close();
// if the pool is passed, it is not closed
assertFalse(otherPool.isShutdown());
otherPool.shutdownNow();
}
@Ignore ("Fails in IDEs: HBASE-9042") @Test(expected = RegionServerStoppedException.class)
public void testClusterStatus() throws Exception {
byte[] tn = "testClusterStatus".getBytes();
@ -459,20 +514,16 @@ public class TestHCM {
public void testConnectionManagement() throws Exception{
TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10,
60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Threads.newDaemonThreadFactory("test-hcm"));
HTable table = new HTable(TABLE_NAME1, conn, pool);
HTableInterface table = conn.getTable(TABLE_NAME1);
//new HTable(TABLE_NAME1, conn, pool);
table.close();
assertFalse(conn.isClosed());
assertFalse(pool.isShutdown());
table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME1, pool);
assertFalse(((HTable)table).getPool().isShutdown());
table = conn.getTable(TABLE_NAME1);
table.close();
assertFalse(pool.isShutdown());
assertFalse(((HTable)table).getPool().isShutdown());
conn.close();
pool.shutdownNow();
assertTrue(((HTable)table).getPool().isShutdown());
}
/**

View File

@ -1101,11 +1101,14 @@ HTable table2 = new HTable(conf2, "myTable");</programlisting>
</para>
<para>
Another solution is to precreate an <classname>HConnection</classname> using
<programlisting>HConnectionManager.createConnection(Configuration)</programlisting> as
well as an <classname>ExecutorService</classname>; then use the
<programlisting>HTable(byte[], HConnection, ExecutorService)</programlisting>
constructor to create <classname>HTable</classname> instances on demand.
This construction is very lightweight and resources are controlled/shared if you go this route.
<programlisting>// Create a connection to the cluster.
HConnection connection = HConnectionManager.createConnection(Configuration);
HTableInterface table = connection.getTable("myTable");
// use table as needed, the table returned is lightweight
table.close();
// use the connection for other access to the cluster
connection.close();</programlisting>
Constructing HTableInterface implementation is very lightweight and resources are controlled/shared if you go this route.
</para>
</section>
</section>