HBASE-13201 Remove HTablePool from thrift-server (Solomon Duskis)

This commit is contained in:
tedyu 2015-03-11 19:06:32 -07:00
parent 7a3ea23704
commit ff8840acc3
3 changed files with 28 additions and 1102 deletions

View File

@ -1,696 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift2;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTableInterfaceFactory;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
* A simple pool of HTable instances.
*
* Each HTablePool acts as a pool for all tables. To use, instantiate an
* HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
*
* This method is not needed anymore, clients should call
* HTableInterface.close() rather than returning the tables to the pool
*
* Once you are done with it, close your instance of
* {@link org.apache.hadoop.hbase.client.HTableInterface}
* by calling {@link org.apache.hadoop.hbase.client.HTableInterface#close()} rather than returning
* the tablesto the pool with (deprecated)
* {@link #putTable(org.apache.hadoop.hbase.client.HTableInterface)}.
*
* <p>
* A pool can be created with a <i>maxSize</i> which defines the most HTable
* references that will ever be retained for each table. Otherwise the default
* is {@link Integer#MAX_VALUE}.
*
* <p>
* Pool will manage its own connections to the cluster. See
* {@link org.apache.hadoop.hbase.client.HConnectionManager}.
* Was @deprecated made @InterfaceAudience.private as of 0.98.1.
* See {@link org.apache.hadoop.hbase.client.HConnection#getTable(String)},
* Moved to thrift2 module for 2.0
*/
@InterfaceAudience.Private
public class HTablePool implements Closeable {
private final PoolMap<String, HTableInterface> tables;
private final int maxSize;
private final PoolType poolType;
private final Configuration config;
private final HTableInterfaceFactory tableFactory;
/**
* Default Constructor. Default HBaseConfiguration and no limit on pool size.
*/
public HTablePool() {
this(HBaseConfiguration.create(), Integer.MAX_VALUE);
}
/**
* Constructor to set maximum versions and use the specified configuration.
*
* @param config
* configuration
* @param maxSize
* maximum number of references to keep for each table
*/
public HTablePool(final Configuration config, final int maxSize) {
this(config, maxSize, null, null);
}
/**
* Constructor to set maximum versions and use the specified configuration and
* table factory.
*
* @param config
* configuration
* @param maxSize
* maximum number of references to keep for each table
* @param tableFactory
* table factory
*/
public HTablePool(final Configuration config, final int maxSize,
final HTableInterfaceFactory tableFactory) {
this(config, maxSize, tableFactory, PoolType.Reusable);
}
/**
* Constructor to set maximum versions and use the specified configuration and
* pool type.
*
* @param config
* configuration
* @param maxSize
* maximum number of references to keep for each table
* @param poolType
* pool type which is one of {@link PoolType#Reusable} or
* {@link PoolType#ThreadLocal}
*/
public HTablePool(final Configuration config, final int maxSize,
final PoolType poolType) {
this(config, maxSize, null, poolType);
}
/**
* Constructor to set maximum versions and use the specified configuration,
* table factory and pool type. The HTablePool supports the
* {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
* type is null or not one of those two values, then it will default to
* {@link PoolType#Reusable}.
*
* @param config
* configuration
* @param maxSize
* maximum number of references to keep for each table
* @param tableFactory
* table factory
* @param poolType
* pool type which is one of {@link PoolType#Reusable} or
* {@link PoolType#ThreadLocal}
*/
public HTablePool(final Configuration config, final int maxSize,
final HTableInterfaceFactory tableFactory, PoolType poolType) {
// Make a new configuration instance so I can safely cleanup when
// done with the pool.
this.config = config == null ? HBaseConfiguration.create() : config;
this.maxSize = maxSize;
this.tableFactory = tableFactory == null ? new HTableFactory()
: tableFactory;
if (poolType == null) {
this.poolType = PoolType.Reusable;
} else {
switch (poolType) {
case Reusable:
case ThreadLocal:
this.poolType = poolType;
break;
default:
this.poolType = PoolType.Reusable;
break;
}
}
this.tables = new PoolMap<String, HTableInterface>(this.poolType,
this.maxSize);
}
/**
* Get a reference to the specified table from the pool.
* <p>
* <p/>
*
* @param tableName
* table name
* @return a reference to the specified table
* @throws RuntimeException
* if there is a problem instantiating the HTable
*/
public HTableInterface getTable(String tableName) {
// call the old getTable implementation renamed to findOrCreateTable
HTableInterface table = findOrCreateTable(tableName);
// return a proxy table so when user closes the proxy, the actual table
// will be returned to the pool
return new PooledHTable(table);
}
/**
* Get a reference to the specified table from the pool.
* <p>
*
* Create a new one if one is not available.
*
* @param tableName
* table name
* @return a reference to the specified table
* @throws RuntimeException
* if there is a problem instantiating the HTable
*/
private HTableInterface findOrCreateTable(String tableName) {
HTableInterface table = tables.get(tableName);
if (table == null) {
table = createHTable(tableName);
}
return table;
}
/**
* Get a reference to the specified table from the pool.
* <p>
*
* Create a new one if one is not available.
*
* @param tableName
* table name
* @return a reference to the specified table
* @throws RuntimeException if there is a problem instantiating the HTable
*/
public HTableInterface getTable(byte[] tableName) {
return getTable(Bytes.toString(tableName));
}
/**
* This method is not needed anymore, clients should call
* HTableInterface.close() rather than returning the tables to the pool
*
* @param table
* the proxy table user got from pool
* @deprecated
*/
@Deprecated
public void putTable(HTableInterface table) throws IOException {
// we need to be sure nobody puts a proxy implementation in the pool
// but if the client code is not updated
// and it will continue to call putTable() instead of calling close()
// then we need to return the wrapped table to the pool instead of the
// proxy
// table
if (table instanceof PooledHTable) {
returnTable(((PooledHTable) table).getWrappedTable());
} else {
// normally this should not happen if clients pass back the same
// table
// object they got from the pool
// but if it happens then it's better to reject it
throw new IllegalArgumentException("not a pooled table: " + table);
}
}
/**
* Puts the specified HTable back into the pool.
* <p>
*
* If the pool already contains <i>maxSize</i> references to the table, then
* the table instance gets closed after flushing buffered edits.
*
* @param table
* table
*/
private void returnTable(HTableInterface table) throws IOException {
// this is the old putTable method renamed and made private
String tableName = Bytes.toString(table.getTableName());
if (tables.size(tableName) >= maxSize) {
// release table instance since we're not reusing it
this.tables.removeValue(tableName, table);
this.tableFactory.releaseHTableInterface(table);
return;
}
tables.put(tableName, table);
}
protected HTableInterface createHTable(String tableName) {
return this.tableFactory.createHTableInterface(config,
Bytes.toBytes(tableName));
}
/**
* Closes all the HTable instances , belonging to the given table, in the
* table pool.
* <p>
* Note: this is a 'shutdown' of the given table pool and different from
* {@link #putTable(HTableInterface)}, that is used to return the table
* instance to the pool for future re-use.
*
* @param tableName
*/
public void closeTablePool(final String tableName) throws IOException {
Collection<HTableInterface> tables = this.tables.values(tableName);
if (tables != null) {
for (HTableInterface table : tables) {
this.tableFactory.releaseHTableInterface(table);
}
}
this.tables.remove(tableName);
}
/**
* See {@link #closeTablePool(String)}.
*
* @param tableName
*/
public void closeTablePool(final byte[] tableName) throws IOException {
closeTablePool(Bytes.toString(tableName));
}
/**
* Closes all the HTable instances , belonging to all tables in the table
* pool.
* <p>
* Note: this is a 'shutdown' of all the table pools.
*/
public void close() throws IOException {
for (String tableName : tables.keySet()) {
closeTablePool(tableName);
}
this.tables.clear();
}
public int getCurrentPoolSize(String tableName) {
return tables.size(tableName);
}
/**
* A proxy class that implements HTableInterface.close method to return the
* wrapped table back to the table pool
*
*/
class PooledHTable implements HTableInterface {
private boolean open = false;
private HTableInterface table; // actual table implementation
public PooledHTable(HTableInterface table) {
this.table = table;
this.open = true;
}
@Override
public byte[] getTableName() {
checkState();
return table.getTableName();
}
@Override
public TableName getName() {
return table.getName();
}
@Override
public Configuration getConfiguration() {
checkState();
return table.getConfiguration();
}
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
checkState();
return table.getTableDescriptor();
}
@Override
public boolean exists(Get get) throws IOException {
checkState();
return table.exists(get);
}
@Override
public boolean[] existsAll(List<Get> gets) throws IOException {
checkState();
return table.existsAll(gets);
}
@Override
public Boolean[] exists(List<Get> gets) throws IOException {
checkState();
return table.exists(gets);
}
@Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException,
InterruptedException {
checkState();
table.batch(actions, results);
}
/**
* {@inheritDoc}
* @deprecated If any exception is thrown by one of the actions, there is no way to
* retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
*/
@Deprecated
@Override
public Object[] batch(List<? extends Row> actions) throws IOException,
InterruptedException {
checkState();
return table.batch(actions);
}
@Override
public Result get(Get get) throws IOException {
checkState();
return table.get(get);
}
@Override
public Result[] get(List<Get> gets) throws IOException {
checkState();
return table.get(gets);
}
@Override
@SuppressWarnings("deprecation")
@Deprecated
public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
checkState();
return table.getRowOrBefore(row, family);
}
@Override
public ResultScanner getScanner(Scan scan) throws IOException {
checkState();
return table.getScanner(scan);
}
@Override
public ResultScanner getScanner(byte[] family) throws IOException {
checkState();
return table.getScanner(family);
}
@Override
public ResultScanner getScanner(byte[] family, byte[] qualifier)
throws IOException {
checkState();
return table.getScanner(family, qualifier);
}
@Override
public void put(Put put) throws IOException {
checkState();
table.put(put);
}
@Override
public void put(List<Put> puts) throws IOException {
checkState();
table.put(puts);
}
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException {
checkState();
return table.checkAndPut(row, family, qualifier, value, put);
}
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Put put) throws IOException {
checkState();
return table.checkAndPut(row, family, qualifier, compareOp, value, put);
}
@Override
public void delete(Delete delete) throws IOException {
checkState();
table.delete(delete);
}
@Override
public void delete(List<Delete> deletes) throws IOException {
checkState();
table.delete(deletes);
}
@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Delete delete) throws IOException {
checkState();
return table.checkAndDelete(row, family, qualifier, value, delete);
}
@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete) throws IOException {
checkState();
return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
}
@Override
public Result increment(Increment increment) throws IOException {
checkState();
return table.increment(increment);
}
@Override
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount) throws IOException {
checkState();
return table.incrementColumnValue(row, family, qualifier, amount);
}
@Override
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount, Durability durability) throws IOException {
checkState();
return table.incrementColumnValue(row, family, qualifier, amount,
durability);
}
@Override
public boolean isAutoFlush() {
checkState();
return table.isAutoFlush();
}
@Override
public void flushCommits() throws IOException {
checkState();
table.flushCommits();
}
/**
* Returns the actual table back to the pool
*
* @throws IOException
*/
public void close() throws IOException {
checkState();
open = false;
returnTable(table);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
checkState();
return table.coprocessorService(row);
}
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
throws ServiceException, Throwable {
checkState();
return table.coprocessorService(service, startKey, endKey, callable);
}
@Override
public <T extends Service, R> void coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
throws ServiceException, Throwable {
checkState();
table.coprocessorService(service, startKey, endKey, callable, callback);
}
@Override
public String toString() {
return "PooledHTable{" + ", table=" + table + '}';
}
/**
* Expose the wrapped HTable to tests in the same package
*
* @return wrapped htable
*/
HTableInterface getWrappedTable() {
return table;
}
@Override
public <R> void batchCallback(List<? extends Row> actions,
Object[] results, Callback<R> callback) throws IOException,
InterruptedException {
checkState();
table.batchCallback(actions, results, callback);
}
/**
* {@inheritDoc}
* @deprecated If any exception is thrown by one of the actions, there is no way to
* retrieve the partially executed results. Use
* {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
* instead.
*/
@Deprecated
@Override
public <R> Object[] batchCallback(List<? extends Row> actions,
Callback<R> callback) throws IOException, InterruptedException {
checkState();
return table.batchCallback(actions, callback);
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
checkState();
table.mutateRow(rm);
}
@Override
public Result append(Append append) throws IOException {
checkState();
return table.append(append);
}
@Override
public void setAutoFlush(boolean autoFlush) {
checkState();
table.setAutoFlush(autoFlush, autoFlush);
}
@Override
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
checkState();
table.setAutoFlush(autoFlush, clearBufferOnFail);
}
@Override
public void setAutoFlushTo(boolean autoFlush) {
table.setAutoFlushTo(autoFlush);
}
@Override
public long getWriteBufferSize() {
checkState();
return table.getWriteBufferSize();
}
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
checkState();
table.setWriteBufferSize(writeBufferSize);
}
boolean isOpen() {
return open;
}
private void checkState() {
if (!isOpen()) {
throw new IllegalStateException("Table=" + table.getName()
+ " already closed");
}
}
@Override
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor method, Message request,
byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
checkState();
return table.batchCoprocessorService(method, request, startKey, endKey,
responsePrototype);
}
@Override
public <R extends Message> void batchCoprocessorService(
Descriptors.MethodDescriptor method, Message request,
byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
throws ServiceException, Throwable {
checkState();
table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
byte[] value, RowMutations mutation) throws IOException {
checkState();
return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
}
}
}

View File

@ -18,7 +18,18 @@
*/
package org.apache.hadoop.hbase.thrift2;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deletesFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getsFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putsFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultFromHBase;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.resultsFromHBase;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.rowMutationsFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift;
import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
import java.io.IOException;
@ -30,30 +41,32 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.*;
import org.apache.hadoop.hbase.thrift2.generated.TAppend;
import org.apache.hadoop.hbase.thrift2.generated.TDelete;
import org.apache.hadoop.hbase.thrift2.generated.TGet;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.thrift2.generated.TIOError;
import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument;
import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
import org.apache.hadoop.hbase.thrift2.generated.TPut;
import org.apache.hadoop.hbase.thrift2.generated.TResult;
import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
import org.apache.hadoop.hbase.thrift2.generated.TScan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.thrift.TException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
* defined in the HTableInterface.
@ -63,8 +76,6 @@ import com.google.common.cache.CacheBuilder;
public class ThriftHBaseServiceHandler implements THBaseService.Iface {
// TODO: Size of pool configuraple
private final Cache<String, HTablePool> htablePools;
private final Callable<? extends HTablePool> htablePoolCreater;
private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
// nextScannerId and scannerMap are used to manage scanner state
@ -74,8 +85,6 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
new ConcurrentHashMap<Integer, ResultScanner>();
private final ConnectionCache connectionCache;
private final HTableFactory tableFactory;
private final int maxPoolSize;
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
@ -86,7 +95,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
}
private static class THBaseServiceMetricsProxy implements InvocationHandler {
private static final class THBaseServiceMetricsProxy implements InvocationHandler {
private final THBaseService.Iface handler;
private final ThriftMetrics metrics;
@ -122,34 +131,13 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
tableFactory = new HTableFactory() {
@Override
public HTableInterface createHTableInterface(Configuration config,
byte[] tableName) {
try {
return connectionCache.getTable(Bytes.toString(tableName));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
};
htablePools = CacheBuilder.newBuilder().expireAfterAccess(
maxIdleTime, TimeUnit.MILLISECONDS).softValues().concurrencyLevel(4).build();
maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000);
htablePoolCreater = new Callable<HTablePool>() {
public HTablePool call() {
return new HTablePool(conf, maxPoolSize, tableFactory);
}
};
}
private Table getTable(ByteBuffer tableName) {
String currentUser = connectionCache.getEffectiveUser();
try {
HTablePool htablePool = htablePools.get(currentUser, htablePoolCreater);
return htablePool.getTable(byteBufferToByteArray(tableName));
} catch (ExecutionException ee) {
throw new RuntimeException(ee);
return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
} catch (IOException ie) {
throw new RuntimeException(ie);
}
}

View File

@ -1,366 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift2;
import java.io.IOException;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.junit.*;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
/**
* Tests HTablePool.
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({TestHTablePool.TestHTableReusablePool.class, TestHTablePool.TestHTableThreadLocalPool.class})
@Category({ClientTests.class, MediumTests.class})
public class TestHTablePool {
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static String TABLENAME = "TestHTablePool";
public abstract static class TestHTablePoolType {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TableName.valueOf(TABLENAME), HConstants.CATALOG_FAMILY);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
protected abstract PoolType getPoolType();
@Test
public void testTableWithStringName() throws Exception {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE, getPoolType());
String tableName = TABLENAME;
// Request a table from an empty pool
Table table = pool.getTable(tableName);
Assert.assertNotNull(table);
// Close table (returns table to the pool)
table.close();
// Request a table of the same name
Table sameTable = pool.getTable(tableName);
Assert.assertSame(
((HTablePool.PooledHTable) table).getWrappedTable(),
((HTablePool.PooledHTable) sameTable).getWrappedTable());
}
@Test
public void testTableWithByteArrayName() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE, getPoolType());
// Request a table from an empty pool
Table table = pool.getTable(TABLENAME);
Assert.assertNotNull(table);
// Close table (returns table to the pool)
table.close();
// Request a table of the same name
Table sameTable = pool.getTable(TABLENAME);
Assert.assertSame(
((HTablePool.PooledHTable) table).getWrappedTable(),
((HTablePool.PooledHTable) sameTable).getWrappedTable());
}
@Test
public void testTablesWithDifferentNames() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE, getPoolType());
// We add the class to the table name as the HBase cluster is reused
// during the tests: this gives naming unicity.
byte[] otherTable = Bytes.toBytes(
"OtherTable_" + getClass().getSimpleName()
);
TEST_UTIL.createTable(otherTable, HConstants.CATALOG_FAMILY);
// Request a table from an empty pool
Table table1 = pool.getTable(TABLENAME);
Table table2 = pool.getTable(otherTable);
Assert.assertNotNull(table2);
// Close tables (returns tables to the pool)
table1.close();
table2.close();
// Request tables of the same names
Table sameTable1 = pool.getTable(TABLENAME);
Table sameTable2 = pool.getTable(otherTable);
Assert.assertSame(
((HTablePool.PooledHTable) table1).getWrappedTable(),
((HTablePool.PooledHTable) sameTable1).getWrappedTable());
Assert.assertSame(
((HTablePool.PooledHTable) table2).getWrappedTable(),
((HTablePool.PooledHTable) sameTable2).getWrappedTable());
}
@Test
public void testProxyImplementationReturned() {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE);
String tableName = TABLENAME;// Request a table from
// an
// empty pool
Table table = pool.getTable(tableName);
// Test if proxy implementation is returned
Assert.assertTrue(table instanceof HTablePool.PooledHTable);
}
@Test
public void testDeprecatedUsagePattern() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE);
String tableName = TABLENAME;// Request a table from
// an
// empty pool
// get table will return proxy implementation
HTableInterface table = pool.getTable(tableName);
// put back the proxy implementation instead of closing it
pool.putTable(table);
// Request a table of the same name
Table sameTable = pool.getTable(tableName);
// test no proxy over proxy created
Assert.assertSame(((HTablePool.PooledHTable) table).getWrappedTable(),
((HTablePool.PooledHTable) sameTable).getWrappedTable());
}
@Test
public void testReturnDifferentTable() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE);
String tableName = TABLENAME;// Request a table from
// an
// empty pool
// get table will return proxy implementation
final Table table = pool.getTable(tableName);
HTableInterface alienTable = new HTable(TEST_UTIL.getConfiguration(),
TableName.valueOf(TABLENAME)) {
// implementation doesn't matter as long the table is not from
// pool
};
try {
// put the wrong table in pool
pool.putTable(alienTable);
Assert.fail("alien table accepted in pool");
} catch (IllegalArgumentException e) {
Assert.assertTrue("alien table rejected", true);
}
}
@Test
public void testHTablePoolCloseTwice() throws Exception {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
Integer.MAX_VALUE, getPoolType());
String tableName = TABLENAME;
// Request a table from an empty pool
Table table = pool.getTable(tableName);
Assert.assertNotNull(table);
Assert.assertTrue(((HTablePool.PooledHTable) table).isOpen());
// Close table (returns table to the pool)
table.close();
// check if the table is closed
Assert.assertFalse(((HTablePool.PooledHTable) table).isOpen());
try {
table.close();
Assert.fail("Should not allow table to be closed twice");
} catch (IllegalStateException ex) {
Assert.assertTrue("table cannot be closed twice", true);
} finally {
pool.close();
}
}
}
@Category({ClientTests.class, MediumTests.class})
public static class TestHTableReusablePool extends TestHTablePoolType {
@Override
protected PoolType getPoolType() {
return PoolType.Reusable;
}
@Test
public void testTableWithMaxSize() throws Exception {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2,
getPoolType());
// Request tables from an empty pool
Table table1 = pool.getTable(TABLENAME);
Table table2 = pool.getTable(TABLENAME);
Table table3 = pool.getTable(TABLENAME);
// Close tables (returns tables to the pool)
table1.close();
table2.close();
// The pool should reject this one since it is already full
table3.close();
// Request tables of the same name
Table sameTable1 = pool.getTable(TABLENAME);
Table sameTable2 = pool.getTable(TABLENAME);
Table sameTable3 = pool.getTable(TABLENAME);
Assert.assertSame(
((HTablePool.PooledHTable) table1).getWrappedTable(),
((HTablePool.PooledHTable) sameTable1).getWrappedTable());
Assert.assertSame(
((HTablePool.PooledHTable) table2).getWrappedTable(),
((HTablePool.PooledHTable) sameTable2).getWrappedTable());
Assert.assertNotSame(
((HTablePool.PooledHTable) table3).getWrappedTable(),
((HTablePool.PooledHTable) sameTable3).getWrappedTable());
}
@Test
public void testCloseTablePool() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4,
getPoolType());
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
if (admin.tableExists(TABLENAME)) {
admin.disableTable(TABLENAME);
admin.deleteTable(TABLENAME);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(TABLENAME));
tableDescriptor.addFamily(new HColumnDescriptor("randomFamily"));
admin.createTable(tableDescriptor);
// Request tables from an empty pool
Table[] tables = new Table[4];
for (int i = 0; i < 4; ++i) {
tables[i] = pool.getTable(TABLENAME);
}
pool.closeTablePool(TABLENAME);
for (int i = 0; i < 4; ++i) {
tables[i].close();
}
Assert.assertEquals(4,
pool.getCurrentPoolSize(TABLENAME));
pool.closeTablePool(TABLENAME);
Assert.assertEquals(0,
pool.getCurrentPoolSize(TABLENAME));
}
}
@Category({ClientTests.class, MediumTests.class})
public static class TestHTableThreadLocalPool extends TestHTablePoolType {
@Override
protected PoolType getPoolType() {
return PoolType.ThreadLocal;
}
@Test
public void testTableWithMaxSize() throws Exception {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2,
getPoolType());
// Request tables from an empty pool
Table table1 = pool.getTable(TABLENAME);
Table table2 = pool.getTable(TABLENAME);
Table table3 = pool.getTable(TABLENAME);
// Close tables (returns tables to the pool)
table1.close();
table2.close();
// The pool should not reject this one since the number of threads
// <= 2
table3.close();
// Request tables of the same name
Table sameTable1 = pool.getTable(TABLENAME);
Table sameTable2 = pool.getTable(TABLENAME);
Table sameTable3 = pool.getTable(TABLENAME);
Assert.assertSame(
((HTablePool.PooledHTable) table3).getWrappedTable(),
((HTablePool.PooledHTable) sameTable1).getWrappedTable());
Assert.assertSame(
((HTablePool.PooledHTable) table3).getWrappedTable(),
((HTablePool.PooledHTable) sameTable2).getWrappedTable());
Assert.assertSame(
((HTablePool.PooledHTable) table3).getWrappedTable(),
((HTablePool.PooledHTable) sameTable3).getWrappedTable());
}
@Test
public void testCloseTablePool() throws IOException {
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4,
getPoolType());
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
if (admin.tableExists(TABLENAME)) {
admin.disableTable(TABLENAME);
admin.deleteTable(TABLENAME);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(TABLENAME));
tableDescriptor.addFamily(new HColumnDescriptor("randomFamily"));
admin.createTable(tableDescriptor);
// Request tables from an empty pool
Table[] tables = new Table[4];
for (int i = 0; i < 4; ++i) {
tables[i] = pool.getTable(TABLENAME);
}
pool.closeTablePool(TABLENAME);
for (int i = 0; i < 4; ++i) {
tables[i].close();
}
Assert.assertEquals(1,
pool.getCurrentPoolSize(TABLENAME));
pool.closeTablePool(TABLENAME);
Assert.assertEquals(0,
pool.getCurrentPoolSize(TABLENAME));
}
}
}