HBASE-4805 Allow better control of resource consumption in HTable (Lars H)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1203428 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3039d61014
commit
a9253a7104
|
@ -715,6 +715,7 @@ Release 0.92.0 - Unreleased
|
||||||
(Mingjie Lai)
|
(Mingjie Lai)
|
||||||
HBASE-4779 TestHTablePool, TestScanWithBloomError, TestRegionSplitCalculator are
|
HBASE-4779 TestHTablePool, TestScanWithBloomError, TestRegionSplitCalculator are
|
||||||
not tagged and TestPoolMap should not use TestSuite (N Keywal)
|
not tagged and TestPoolMap should not use TestSuite (N Keywal)
|
||||||
|
HBASE-4805 Allow better control of resource consumption in HTable (Lars H)
|
||||||
|
|
||||||
TASKS
|
TASKS
|
||||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||||
|
|
|
@ -369,4 +369,8 @@ public interface HConnection extends Abortable, Closeable {
|
||||||
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames)
|
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if this connection is closed
|
||||||
|
*/
|
||||||
|
public boolean isClosed();
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,7 +177,7 @@ public class HConnectionManager {
|
||||||
synchronized (HBASE_INSTANCES) {
|
synchronized (HBASE_INSTANCES) {
|
||||||
HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
|
HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
connection = new HConnectionImplementation(conf);
|
connection = new HConnectionImplementation(conf, true);
|
||||||
HBASE_INSTANCES.put(connectionKey, connection);
|
HBASE_INSTANCES.put(connectionKey, connection);
|
||||||
}
|
}
|
||||||
connection.incCount();
|
connection.incCount();
|
||||||
|
@ -185,6 +185,21 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new HConnection instance using the passed <code>conf</code>
|
||||||
|
* instance.
|
||||||
|
* Note: This bypasses the usual HConnection life cycle management!
|
||||||
|
* Use this with caution, the caller is responsible for closing the
|
||||||
|
* created connection.
|
||||||
|
* @param conf configuration
|
||||||
|
* @return HConnection object for <code>conf</code>
|
||||||
|
* @throws ZooKeeperConnectionException
|
||||||
|
*/
|
||||||
|
public static HConnection createConnection(Configuration conf)
|
||||||
|
throws ZooKeeperConnectionException {
|
||||||
|
return new HConnectionImplementation(conf, false);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete connection information for the instance specified by configuration.
|
* Delete connection information for the instance specified by configuration.
|
||||||
* If there are no more references to it, this will then close connection to
|
* If there are no more references to it, this will then close connection to
|
||||||
|
@ -483,15 +498,17 @@ public class HConnectionManager {
|
||||||
private boolean stopProxy;
|
private boolean stopProxy;
|
||||||
private int refCount;
|
private int refCount;
|
||||||
|
|
||||||
|
// indicates whether this connection's life cycle is managed
|
||||||
|
private final boolean managed;
|
||||||
/**
|
/**
|
||||||
* constructor
|
* constructor
|
||||||
* @param conf Configuration object
|
* @param conf Configuration object
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public HConnectionImplementation(Configuration conf)
|
public HConnectionImplementation(Configuration conf, boolean managed)
|
||||||
throws ZooKeeperConnectionException {
|
throws ZooKeeperConnectionException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
this.managed = managed;
|
||||||
String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS,
|
String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS,
|
||||||
HConstants.DEFAULT_REGION_SERVER_CLASS);
|
HConstants.DEFAULT_REGION_SERVER_CLASS);
|
||||||
this.closed = false;
|
this.closed = false;
|
||||||
|
@ -1640,6 +1657,11 @@ public class HConnectionManager {
|
||||||
this.closed = true;
|
this.closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isClosed() {
|
||||||
|
return this.closed;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isAborted(){
|
public boolean isAborted(){
|
||||||
return this.aborted;
|
return this.aborted;
|
||||||
|
@ -1712,7 +1734,11 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
|
if (managed) {
|
||||||
HConnectionManager.deleteConnection((HConnection)this, stopProxy, false);
|
HConnectionManager.deleteConnection((HConnection)this, stopProxy, false);
|
||||||
|
} else {
|
||||||
|
close(true);
|
||||||
|
}
|
||||||
LOG.debug("The connection to " + this.zooKeeper + " has been closed.");
|
LOG.debug("The connection to " + this.zooKeeper + " has been closed.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -111,7 +111,7 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
private static final Log LOG = LogFactory.getLog(HTable.class);
|
private static final Log LOG = LogFactory.getLog(HTable.class);
|
||||||
private HConnection connection;
|
private HConnection connection;
|
||||||
private final byte [] tableName;
|
private final byte [] tableName;
|
||||||
protected final int scannerTimeout;
|
protected int scannerTimeout;
|
||||||
private volatile Configuration configuration;
|
private volatile Configuration configuration;
|
||||||
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
|
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
|
||||||
private long writeBufferSize;
|
private long writeBufferSize;
|
||||||
|
@ -125,6 +125,7 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
private int operationTimeout;
|
private int operationTimeout;
|
||||||
private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts.
|
private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts.
|
||||||
|
private final boolean cleanupOnClose; // close the connection in close()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an object to access a HBase table.
|
* Creates an object to access a HBase table.
|
||||||
|
@ -155,36 +156,19 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
public HTable(Configuration conf, final byte [] tableName)
|
public HTable(Configuration conf, final byte [] tableName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
|
this.cleanupOnClose = true;
|
||||||
if (conf == null) {
|
if (conf == null) {
|
||||||
this.scannerTimeout = 0;
|
this.scannerTimeout = 0;
|
||||||
this.connection = null;
|
this.connection = null;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.connection = HConnectionManager.getConnection(conf);
|
this.connection = HConnectionManager.getConnection(conf);
|
||||||
this.scannerTimeout =
|
|
||||||
(int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
|
||||||
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
|
|
||||||
this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
|
|
||||||
: conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
|
||||||
this.configuration = conf;
|
this.configuration = conf;
|
||||||
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
|
||||||
this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
|
|
||||||
this.clearBufferOnFail = true;
|
|
||||||
this.autoFlush = true;
|
|
||||||
this.currentWriteBufferSize = 0;
|
|
||||||
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
|
|
||||||
|
|
||||||
this.maxScannerResultSize = conf.getLong(
|
|
||||||
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
|
||||||
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
|
|
||||||
|
|
||||||
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
|
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
|
||||||
if (maxThreads == 0) {
|
if (maxThreads == 0) {
|
||||||
maxThreads = 1; // is there a better default?
|
maxThreads = 1; // is there a better default?
|
||||||
}
|
}
|
||||||
|
|
||||||
// Using the "direct handoff" approach, new threads will only be created
|
// Using the "direct handoff" approach, new threads will only be created
|
||||||
// if it is necessary and will grow unbounded. This could be bad but in HCM
|
// if it is necessary and will grow unbounded. This could be bad but in HCM
|
||||||
// we only create as many Runnables as there are region servers. It means
|
// we only create as many Runnables as there are region servers. It means
|
||||||
|
@ -194,6 +178,63 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
new SynchronousQueue<Runnable>(),
|
new SynchronousQueue<Runnable>(),
|
||||||
new DaemonThreadFactory());
|
new DaemonThreadFactory());
|
||||||
((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
|
((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
|
||||||
|
|
||||||
|
this.finishSetup();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an object to access a HBase table.
|
||||||
|
* Shares zookeeper connection and other resources with other HTable instances
|
||||||
|
* created with the same <code>connection</code> instance.
|
||||||
|
* Use this constructor when the ExecutorService and HConnection instance are
|
||||||
|
* externally managed.
|
||||||
|
* @param tableName Name of the table.
|
||||||
|
* @param connection HConnection to be used.
|
||||||
|
* @param pool ExecutorService to be used.
|
||||||
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
*/
|
||||||
|
public HTable(final byte[] tableName, final HConnection connection,
|
||||||
|
final ExecutorService pool) throws IOException {
|
||||||
|
if (pool == null || pool.isShutdown()) {
|
||||||
|
throw new IllegalArgumentException("Pool is null or shut down.");
|
||||||
|
}
|
||||||
|
if (connection == null || connection.isClosed()) {
|
||||||
|
throw new IllegalArgumentException("Connection is null or closed.");
|
||||||
|
}
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.cleanupOnClose = false;
|
||||||
|
this.connection = connection;
|
||||||
|
this.configuration = connection.getConfiguration();
|
||||||
|
this.pool = pool;
|
||||||
|
|
||||||
|
this.finishSetup();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* setup this HTable's parameter based on the passed configuration
|
||||||
|
* @param conf
|
||||||
|
*/
|
||||||
|
private void finishSetup() throws IOException {
|
||||||
|
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
||||||
|
this.scannerTimeout = (int) this.configuration.getLong(
|
||||||
|
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
|
||||||
|
this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
|
||||||
|
: this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||||
|
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||||
|
this.writeBufferSize = this.configuration.getLong(
|
||||||
|
"hbase.client.write.buffer", 2097152);
|
||||||
|
this.clearBufferOnFail = true;
|
||||||
|
this.autoFlush = true;
|
||||||
|
this.currentWriteBufferSize = 0;
|
||||||
|
this.scannerCaching = this.configuration.getInt(
|
||||||
|
"hbase.client.scanner.caching", 1);
|
||||||
|
|
||||||
|
this.maxScannerResultSize = this.configuration.getLong(
|
||||||
|
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||||
|
this.maxKeyValueSize = this.configuration.getInt(
|
||||||
|
"hbase.client.keyvalue.maxsize", -1);
|
||||||
this.closed = false;
|
this.closed = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -886,10 +927,12 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
flushCommits();
|
flushCommits();
|
||||||
|
if (cleanupOnClose) {
|
||||||
this.pool.shutdown();
|
this.pool.shutdown();
|
||||||
if (this.connection != null) {
|
if (this.connection != null) {
|
||||||
this.connection.close();
|
this.connection.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
this.closed = true;
|
this.closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class HConnectionTestingUtility {
|
||||||
HConnectionImplementation connection =
|
HConnectionImplementation connection =
|
||||||
HConnectionManager.HBASE_INSTANCES.get(connectionKey);
|
HConnectionManager.HBASE_INSTANCES.get(connectionKey);
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
connection = Mockito.spy(new HConnectionImplementation(conf));
|
connection = Mockito.spy(new HConnectionImplementation(conf, true));
|
||||||
HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection);
|
HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection);
|
||||||
}
|
}
|
||||||
return connection;
|
return connection;
|
||||||
|
|
|
@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.client;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -203,4 +205,51 @@ public class TestHCM {
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClosing() throws Exception {
|
||||||
|
Configuration configuration = TEST_UTIL.getConfiguration();
|
||||||
|
configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
|
||||||
|
String.valueOf(_randy.nextInt()));
|
||||||
|
|
||||||
|
HConnection c1 = HConnectionManager.createConnection(configuration);
|
||||||
|
HConnection c2 = HConnectionManager.createConnection(configuration);
|
||||||
|
|
||||||
|
HConnection c3 = HConnectionManager.getConnection(configuration);
|
||||||
|
HConnection c4 = HConnectionManager.getConnection(configuration);
|
||||||
|
assertTrue(c3 == c4);
|
||||||
|
|
||||||
|
c1.close();
|
||||||
|
assertTrue(c1.isClosed());
|
||||||
|
assertFalse(c2.isClosed());
|
||||||
|
assertFalse(c3.isClosed());
|
||||||
|
|
||||||
|
c3.close();
|
||||||
|
// still a reference left
|
||||||
|
assertFalse(c3.isClosed());
|
||||||
|
c3.close();
|
||||||
|
assertTrue(c3.isClosed());
|
||||||
|
// c3 was removed from the cache
|
||||||
|
assertTrue(HConnectionManager.getConnection(configuration) != c3);
|
||||||
|
|
||||||
|
assertFalse(c2.isClosed());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trivial test to verify that nobody messes with
|
||||||
|
* {@link HConnectionManager#createConnection(Configuration)}
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateConnection() throws Exception {
|
||||||
|
Configuration configuration = TEST_UTIL.getConfiguration();
|
||||||
|
HConnection c1 = HConnectionManager.createConnection(configuration);
|
||||||
|
HConnection c2 = HConnectionManager.createConnection(configuration);
|
||||||
|
// created from the same configuration, yet they are different
|
||||||
|
assertTrue(c1 != c2);
|
||||||
|
assertTrue(c1.getConfiguration() == c2.getConfiguration());
|
||||||
|
// make sure these were not cached
|
||||||
|
HConnection c3 = HConnectionManager.getConnection(configuration);
|
||||||
|
assertTrue(c1 != c3);
|
||||||
|
assertTrue(c2 != c3);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue