diff --git a/CHANGES.txt b/CHANGES.txt index 196d8a9914f..4d3f5311604 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -715,6 +715,7 @@ Release 0.92.0 - Unreleased (Mingjie Lai) HBASE-4779 TestHTablePool, TestScanWithBloomError, TestRegionSplitCalculator are not tagged and TestPoolMap should not use TestSuite (N Keywal) + HBASE-4805 Allow better control of resource consumption in HTable (Lars H) TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index fadbb8dc81c..0e78d96c3ba 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -369,4 +369,8 @@ public interface HConnection extends Abortable, Closeable { public HTableDescriptor[] getHTableDescriptors(List tableNames) throws IOException; + /** + * @return true if this connection is closed + */ + public boolean isClosed(); } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index cba7bd1fc1d..74db218c19a 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -177,7 +177,7 @@ public class HConnectionManager { synchronized (HBASE_INSTANCES) { HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { - connection = new HConnectionImplementation(conf); + connection = new HConnectionImplementation(conf, true); HBASE_INSTANCES.put(connectionKey, connection); } connection.incCount(); @@ -185,6 +185,21 @@ public class HConnectionManager { } } + /** + * Create a new HConnection instance using the passed conf + * instance. + * Note: This bypasses the usual HConnection life cycle management! + * Use this with caution, the caller is responsible for closing the + * created connection. + * @param conf configuration + * @return HConnection object for conf + * @throws ZooKeeperConnectionException + */ + public static HConnection createConnection(Configuration conf) + throws ZooKeeperConnectionException { + return new HConnectionImplementation(conf, false); + } + /** * Delete connection information for the instance specified by configuration. * If there are no more references to it, this will then close connection to @@ -483,15 +498,17 @@ public class HConnectionManager { private boolean stopProxy; private int refCount; - + // indicates whether this connection's life cycle is managed + private final boolean managed; /** * constructor * @param conf Configuration object */ @SuppressWarnings("unchecked") - public HConnectionImplementation(Configuration conf) + public HConnectionImplementation(Configuration conf, boolean managed) throws ZooKeeperConnectionException { this.conf = conf; + this.managed = managed; String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS, HConstants.DEFAULT_REGION_SERVER_CLASS); this.closed = false; @@ -1639,6 +1656,11 @@ public class HConnectionManager { this.aborted = true; this.closed = true; } + + @Override + public boolean isClosed() { + return this.closed; + } @Override public boolean isAborted(){ @@ -1712,7 +1734,11 @@ public class HConnectionManager { } public void close() { - HConnectionManager.deleteConnection((HConnection)this, stopProxy, false); + if (managed) { + HConnectionManager.deleteConnection((HConnection)this, stopProxy, false); + } else { + close(true); + } LOG.debug("The connection to " + this.zooKeeper + " has been closed."); } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 3a5fbd2762f..303705cbce2 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -111,7 +111,7 @@ public class HTable implements HTableInterface, Closeable { private static final Log LOG = LogFactory.getLog(HTable.class); private HConnection connection; private final byte [] tableName; - protected final int scannerTimeout; + protected int scannerTimeout; private volatile Configuration configuration; private final ArrayList writeBuffer = new ArrayList(); private long writeBufferSize; @@ -125,6 +125,7 @@ public class HTable implements HTableInterface, Closeable { private boolean closed; private int operationTimeout; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. + private final boolean cleanupOnClose; // close the connection in close() /** * Creates an object to access a HBase table. @@ -155,36 +156,19 @@ public class HTable implements HTableInterface, Closeable { public HTable(Configuration conf, final byte [] tableName) throws IOException { this.tableName = tableName; + this.cleanupOnClose = true; if (conf == null) { this.scannerTimeout = 0; this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); - this.scannerTimeout = - (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); - this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT - : conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.configuration = conf; - this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); - this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152); - this.clearBufferOnFail = true; - this.autoFlush = true; - this.currentWriteBufferSize = 0; - this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); - - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } - // Using the "direct handoff" approach, new threads will only be created // if it is necessary and will grow unbounded. This could be bad but in HCM // we only create as many Runnables as there are region servers. It means @@ -194,6 +178,63 @@ public class HTable implements HTableInterface, Closeable { new SynchronousQueue(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); + + this.finishSetup(); + } + + /** + * Creates an object to access a HBase table. + * Shares zookeeper connection and other resources with other HTable instances + * created with the same connection instance. + * Use this constructor when the ExecutorService and HConnection instance are + * externally managed. + * @param tableName Name of the table. + * @param connection HConnection to be used. + * @param pool ExecutorService to be used. + * @throws IOException if a remote or network exception occurs + */ + public HTable(final byte[] tableName, final HConnection connection, + final ExecutorService pool) throws IOException { + if (pool == null || pool.isShutdown()) { + throw new IllegalArgumentException("Pool is null or shut down."); + } + if (connection == null || connection.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + this.tableName = tableName; + this.cleanupOnClose = false; + this.connection = connection; + this.configuration = connection.getConfiguration(); + this.pool = pool; + + this.finishSetup(); + } + + /** + * setup this HTable's parameter based on the passed configuration + * @param conf + */ + private void finishSetup() throws IOException { + this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); + this.scannerTimeout = (int) this.configuration.getLong( + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); + this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT + : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.writeBufferSize = this.configuration.getLong( + "hbase.client.write.buffer", 2097152); + this.clearBufferOnFail = true; + this.autoFlush = true; + this.currentWriteBufferSize = 0; + this.scannerCaching = this.configuration.getInt( + "hbase.client.scanner.caching", 1); + + this.maxScannerResultSize = this.configuration.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.maxKeyValueSize = this.configuration.getInt( + "hbase.client.keyvalue.maxsize", -1); this.closed = false; } @@ -886,9 +927,11 @@ public class HTable implements HTableInterface, Closeable { return; } flushCommits(); - this.pool.shutdown(); - if (this.connection != null) { - this.connection.close(); + if (cleanupOnClose) { + this.pool.shutdown(); + if (this.connection != null) { + this.connection.close(); + } } this.closed = true; } diff --git a/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 20583976d8b..17da42de170 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -79,7 +79,7 @@ public class HConnectionTestingUtility { HConnectionImplementation connection = HConnectionManager.HBASE_INSTANCES.get(connectionKey); if (connection == null) { - connection = Mockito.spy(new HConnectionImplementation(conf)); + connection = Mockito.spy(new HConnectionImplementation(conf, true)); HConnectionManager.HBASE_INSTANCES.put(connectionKey, connection); } return connection; diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 9fc1965f0f0..4847ab445b8 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import java.lang.reflect.Field; import java.util.ArrayList; @@ -203,4 +205,51 @@ public class TestHCM { Thread.sleep(50); } } + + @Test + public void testClosing() throws Exception { + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID, + String.valueOf(_randy.nextInt())); + + HConnection c1 = HConnectionManager.createConnection(configuration); + HConnection c2 = HConnectionManager.createConnection(configuration); + + HConnection c3 = HConnectionManager.getConnection(configuration); + HConnection c4 = HConnectionManager.getConnection(configuration); + assertTrue(c3 == c4); + + c1.close(); + assertTrue(c1.isClosed()); + assertFalse(c2.isClosed()); + assertFalse(c3.isClosed()); + + c3.close(); + // still a reference left + assertFalse(c3.isClosed()); + c3.close(); + assertTrue(c3.isClosed()); + // c3 was removed from the cache + assertTrue(HConnectionManager.getConnection(configuration) != c3); + + assertFalse(c2.isClosed()); + } + + /** + * Trivial test to verify that nobody messes with + * {@link HConnectionManager#createConnection(Configuration)} + */ + @Test + public void testCreateConnection() throws Exception { + Configuration configuration = TEST_UTIL.getConfiguration(); + HConnection c1 = HConnectionManager.createConnection(configuration); + HConnection c2 = HConnectionManager.createConnection(configuration); + // created from the same configuration, yet they are different + assertTrue(c1 != c2); + assertTrue(c1.getConfiguration() == c2.getConfiguration()); + // make sure these were not cached + HConnection c3 = HConnectionManager.getConnection(configuration); + assertTrue(c1 != c3); + assertTrue(c2 != c3); + } }