HBASE-8336 PooledHTable may be returned multiple times to the same pool
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1485402 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
91148bcc2a
commit
e2f57c7696
|
@ -324,138 +324,165 @@ public class HTablePool implements Closeable {
|
|||
*/
|
||||
class PooledHTable implements HTableInterface {
|
||||
|
||||
private boolean open = false;
|
||||
|
||||
private HTableInterface table; // actual table implementation
|
||||
|
||||
public PooledHTable(HTableInterface table) {
|
||||
this.table = table;
|
||||
this.open = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getTableName() {
|
||||
checkState();
|
||||
return table.getTableName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
checkState();
|
||||
return table.getConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor getTableDescriptor() throws IOException {
|
||||
checkState();
|
||||
return table.getTableDescriptor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists(Get get) throws IOException {
|
||||
checkState();
|
||||
return table.exists(get);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean[] exists(List<Get> gets) throws IOException {
|
||||
checkState();
|
||||
return table.exists(gets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batch(List<? extends Row> actions, Object[] results) throws IOException,
|
||||
InterruptedException {
|
||||
checkState();
|
||||
table.batch(actions, results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] batch(List<? extends Row> actions) throws IOException,
|
||||
InterruptedException {
|
||||
checkState();
|
||||
return table.batch(actions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result get(Get get) throws IOException {
|
||||
checkState();
|
||||
return table.get(get);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result[] get(List<Get> gets) throws IOException {
|
||||
checkState();
|
||||
return table.get(gets);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
|
||||
checkState();
|
||||
return table.getRowOrBefore(row, family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(Scan scan) throws IOException {
|
||||
checkState();
|
||||
return table.getScanner(scan);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(byte[] family) throws IOException {
|
||||
checkState();
|
||||
return table.getScanner(family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(byte[] family, byte[] qualifier)
|
||||
throws IOException {
|
||||
checkState();
|
||||
return table.getScanner(family, qualifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(Put put) throws IOException {
|
||||
checkState();
|
||||
table.put(put);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(List<Put> puts) throws IOException {
|
||||
checkState();
|
||||
table.put(puts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Put put) throws IOException {
|
||||
checkState();
|
||||
return table.checkAndPut(row, family, qualifier, value, put);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Delete delete) throws IOException {
|
||||
checkState();
|
||||
table.delete(delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(List<Delete> deletes) throws IOException {
|
||||
checkState();
|
||||
table.delete(deletes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) throws IOException {
|
||||
checkState();
|
||||
return table.checkAndDelete(row, family, qualifier, value, delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(Increment increment) throws IOException {
|
||||
checkState();
|
||||
return table.increment(increment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(byte[] row, byte[] family,
|
||||
byte[] qualifier, long amount) throws IOException {
|
||||
checkState();
|
||||
return table.incrementColumnValue(row, family, qualifier, amount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(byte[] row, byte[] family,
|
||||
byte[] qualifier, long amount, Durability durability) throws IOException {
|
||||
checkState();
|
||||
return table.incrementColumnValue(row, family, qualifier, amount,
|
||||
durability);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoFlush() {
|
||||
checkState();
|
||||
return table.isAutoFlush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushCommits() throws IOException {
|
||||
checkState();
|
||||
table.flushCommits();
|
||||
}
|
||||
|
||||
|
@ -465,11 +492,14 @@ public class HTablePool implements Closeable {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void close() throws IOException {
|
||||
checkState();
|
||||
open = false;
|
||||
returnTable(table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CoprocessorRpcChannel coprocessorService(byte[] row) {
|
||||
checkState();
|
||||
return table.coprocessorService(row);
|
||||
}
|
||||
|
||||
|
@ -477,6 +507,7 @@ public class HTablePool implements Closeable {
|
|||
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
|
||||
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
|
||||
throws ServiceException, Throwable {
|
||||
checkState();
|
||||
return table.coprocessorService(service, startKey, endKey, callable);
|
||||
}
|
||||
|
||||
|
@ -484,6 +515,7 @@ public class HTablePool implements Closeable {
|
|||
public <T extends Service, R> void coprocessorService(Class<T> service,
|
||||
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
|
||||
throws ServiceException, Throwable {
|
||||
checkState();
|
||||
table.coprocessorService(service, startKey, endKey, callable, callback);
|
||||
}
|
||||
|
||||
|
@ -505,43 +537,61 @@ public class HTablePool implements Closeable {
|
|||
public <R> void batchCallback(List<? extends Row> actions,
|
||||
Object[] results, Callback<R> callback) throws IOException,
|
||||
InterruptedException {
|
||||
checkState();
|
||||
table.batchCallback(actions, results, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> Object[] batchCallback(List<? extends Row> actions,
|
||||
Callback<R> callback) throws IOException, InterruptedException {
|
||||
checkState();
|
||||
return table.batchCallback(actions, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mutateRow(RowMutations rm) throws IOException {
|
||||
checkState();
|
||||
table.mutateRow(rm);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result append(Append append) throws IOException {
|
||||
checkState();
|
||||
return table.append(append);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAutoFlush(boolean autoFlush) {
|
||||
checkState();
|
||||
table.setAutoFlush(autoFlush);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
|
||||
checkState();
|
||||
table.setAutoFlush(autoFlush, clearBufferOnFail);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferSize() {
|
||||
checkState();
|
||||
return table.getWriteBufferSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWriteBufferSize(long writeBufferSize) throws IOException {
|
||||
checkState();
|
||||
table.setWriteBufferSize(writeBufferSize);
|
||||
}
|
||||
|
||||
boolean isOpen() {
|
||||
return open;
|
||||
}
|
||||
|
||||
private void checkState() {
|
||||
if (!isOpen()) {
|
||||
throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -182,6 +182,34 @@ public class TestHTablePool {
|
|||
Assert.assertTrue("alien table rejected", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHTablePoolCloseTwice() throws Exception {
|
||||
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(),
|
||||
Integer.MAX_VALUE, getPoolType());
|
||||
String tableName = Bytes.toString(TABLENAME);
|
||||
|
||||
// Request a table from an empty pool
|
||||
HTableInterface table = pool.getTable(tableName);
|
||||
Assert.assertNotNull(table);
|
||||
Assert.assertTrue(((HTablePool.PooledHTable) table).isOpen());
|
||||
// Close table (returns table to the pool)
|
||||
table.close();
|
||||
// check if the table is closed
|
||||
Assert.assertFalse(((HTablePool.PooledHTable) table).isOpen());
|
||||
try {
|
||||
table.close();
|
||||
Assert.fail("Should not allow table to be closed twice");
|
||||
} catch (IllegalStateException ex) {
|
||||
Assert.assertTrue("table cannot be closed twice", true);
|
||||
} finally {
|
||||
pool.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Category(MediumTests.class)
|
||||
|
|
Loading…
Reference in New Issue