HADOOP-1528 HClient for multiple tables - expose close table function

HTable

    * added public method close
    * added protected method checkClosed
    * make getConnection public

HConnectionManager

    * a call to getTableServers or reloadTableServers will cause information for closed
      tables to be reloaded

TestHTable

    * new test case


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@562294 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-08-03 00:02:19 +00:00
parent 84ef0ba801
commit 4fa87a0cbb
4 changed files with 169 additions and 19 deletions

View File

@ -83,4 +83,4 @@ Trunk (unreleased changes)
52. HADOOP-1528 HClient for multiple tables (phase 2) all HBase client side code 52. HADOOP-1528 HClient for multiple tables (phase 2) all HBase client side code
(except TestHClient and HBaseShell) have been converted to use the new client (except TestHClient and HBaseShell) have been converted to use the new client
side objects (HTable/HBaseAdmin/HConnection) instead of HClient. side objects (HTable/HBaseAdmin/HConnection) instead of HClient.
53. HADOOP-1528 HClient for multiple tables - expose close table function

View File

@ -276,9 +276,7 @@ public class HConnectionManager implements HConstants {
"table name cannot be null or zero length"); "table name cannot be null or zero length");
} }
if (closedTables.contains(tableName)) { closedTables.remove(tableName);
throw new IllegalStateException("table closed: " + tableName);
}
SortedMap<Text, HRegionLocation> tableServers = SortedMap<Text, HRegionLocation> tableServers =
tablesToServers.get(tableName); tablesToServers.get(tableName);
@ -302,9 +300,7 @@ public class HConnectionManager implements HConstants {
public SortedMap<Text, HRegionLocation> public SortedMap<Text, HRegionLocation>
reloadTableServers(final Text tableName) throws IOException { reloadTableServers(final Text tableName) throws IOException {
if (closedTables.contains(tableName)) { closedTables.remove(tableName);
throw new IllegalStateException("table closed: " + tableName);
}
SortedMap<Text, HRegionLocation> servers = SortedMap<Text, HRegionLocation> servers =
new TreeMap<Text, HRegionLocation>(); new TreeMap<Text, HRegionLocation>();
@ -369,14 +365,14 @@ public class HConnectionManager implements HConstants {
} }
if (closedTables.contains(tableName)) { if (closedTables.contains(tableName)) {
throw new IllegalStateException("table closed: " + tableName); throw new IllegalStateException("table already closed: " + tableName);
} }
SortedMap<Text, HRegionLocation> tableServers = SortedMap<Text, HRegionLocation> tableServers =
tablesToServers.remove(tableName); tablesToServers.remove(tableName);
if (tableServers == null) { if (tableServers == null) {
throw new IllegalArgumentException("table was not opened: " + tableName); throw new IllegalArgumentException("table not open: " + tableName);
} }
closedTables.add(tableName); closedTables.add(tableName);

View File

@ -59,6 +59,12 @@ public class HTable implements HConstants {
protected volatile boolean closed; protected volatile boolean closed;
protected void checkClosed() {
if (closed) {
throw new IllegalStateException("table is closed");
}
}
/** /**
* Creates an object to access a HBase table * Creates an object to access a HBase table
* *
@ -85,6 +91,7 @@ public class HTable implements HConstants {
* @return Location of row. * @return Location of row.
*/ */
HRegionLocation getRegionLocation(Text row) { HRegionLocation getRegionLocation(Text row) {
checkClosed();
if (this.tableServers == null) { if (this.tableServers == null) {
throw new IllegalStateException("Must open table first"); throw new IllegalStateException("Must open table first");
} }
@ -96,10 +103,26 @@ public class HTable implements HConstants {
} }
/** @return the connection */ /** @return the connection */
HConnection getConnection() { public HConnection getConnection() {
checkClosed();
return connection; return connection;
} }
/**
* Releases resources associated with this table. After calling close(), all
* other methods will throw an IllegalStateException
*/
public synchronized void close() {
closed = true;
tableServers = null;
batch = null;
currentLockId = -1L;
currentRegion = null;
currentServer = null;
clientid = -1L;
connection.close(tableName);
}
/** /**
* Verifies that no update is in progress * Verifies that no update is in progress
*/ */
@ -114,9 +137,7 @@ public class HTable implements HConstants {
* @return Array of region starting row keys * @return Array of region starting row keys
*/ */
public Text[] getStartKeys() { public Text[] getStartKeys() {
if (closed) { checkClosed();
throw new IllegalStateException("table is closed");
}
Text[] keys = new Text[tableServers.size()]; Text[] keys = new Text[tableServers.size()];
int i = 0; int i = 0;
for(Text key: tableServers.keySet()){ for(Text key: tableServers.keySet()){
@ -134,6 +155,7 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public byte[] get(Text row, Text column) throws IOException { public byte[] get(Text row, Text column) throws IOException {
checkClosed();
byte [] value = null; byte [] value = null;
for(int tries = 0; tries < numRetries; tries++) { for(int tries = 0; tries < numRetries; tries++) {
HRegionLocation r = getRegionLocation(row); HRegionLocation r = getRegionLocation(row);
@ -173,6 +195,7 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public byte[][] get(Text row, Text column, int numVersions) throws IOException { public byte[][] get(Text row, Text column, int numVersions) throws IOException {
checkClosed();
byte [][] values = null; byte [][] values = null;
for (int tries = 0; tries < numRetries; tries++) { for (int tries = 0; tries < numRetries; tries++) {
HRegionLocation r = getRegionLocation(row); HRegionLocation r = getRegionLocation(row);
@ -226,6 +249,7 @@ public class HTable implements HConstants {
*/ */
public byte[][] get(Text row, Text column, long timestamp, int numVersions) public byte[][] get(Text row, Text column, long timestamp, int numVersions)
throws IOException { throws IOException {
checkClosed();
byte [][] values = null; byte [][] values = null;
for (int tries = 0; tries < numRetries; tries++) { for (int tries = 0; tries < numRetries; tries++) {
HRegionLocation r = getRegionLocation(row); HRegionLocation r = getRegionLocation(row);
@ -274,6 +298,7 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public SortedMap<Text, byte[]> getRow(Text row) throws IOException { public SortedMap<Text, byte[]> getRow(Text row) throws IOException {
checkClosed();
KeyedData[] value = null; KeyedData[] value = null;
for (int tries = 0; tries < numRetries; tries++) { for (int tries = 0; tries < numRetries; tries++) {
HRegionLocation r = getRegionLocation(row); HRegionLocation r = getRegionLocation(row);
@ -372,6 +397,7 @@ public class HTable implements HConstants {
Text startRow, long timestamp, RowFilterInterface filter) Text startRow, long timestamp, RowFilterInterface filter)
throws IOException { throws IOException {
checkClosed();
return new ClientScanner(columns, startRow, timestamp, filter); return new ClientScanner(columns, startRow, timestamp, filter);
} }
@ -385,9 +411,8 @@ public class HTable implements HConstants {
* @return lockid to be used in subsequent put, delete and commit calls * @return lockid to be used in subsequent put, delete and commit calls
*/ */
public synchronized long startBatchUpdate(final Text row) { public synchronized long startBatchUpdate(final Text row) {
if (batch != null || currentLockId != -1L) { checkClosed();
throw new IllegalStateException("update in progress"); checkUpdateInProgress();
}
batch = new BatchUpdate(); batch = new BatchUpdate();
return batch.startUpdate(row); return batch.startUpdate(row);
} }
@ -397,6 +422,7 @@ public class HTable implements HConstants {
* @param lockid lock id returned by startBatchUpdate * @param lockid lock id returned by startBatchUpdate
*/ */
public synchronized void abortBatch(final long lockid) { public synchronized void abortBatch(final long lockid) {
checkClosed();
if (batch == null) { if (batch == null) {
throw new IllegalStateException("no batch update in progress"); throw new IllegalStateException("no batch update in progress");
} }
@ -426,6 +452,7 @@ public class HTable implements HConstants {
public synchronized void commitBatch(final long lockid, final long timestamp) public synchronized void commitBatch(final long lockid, final long timestamp)
throws IOException { throws IOException {
checkClosed();
if (batch == null) { if (batch == null) {
throw new IllegalStateException("no batch update in progress"); throw new IllegalStateException("no batch update in progress");
} }
@ -481,9 +508,8 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public synchronized long startUpdate(final Text row) throws IOException { public synchronized long startUpdate(final Text row) throws IOException {
if (currentLockId != -1L || batch != null) { checkClosed();
throw new IllegalStateException("update in progress"); checkUpdateInProgress();
}
for (int tries = 0; tries < numRetries; tries++) { for (int tries = 0; tries < numRetries; tries++) {
IOException e = null; IOException e = null;
HRegionLocation info = getRegionLocation(row); HRegionLocation info = getRegionLocation(row);
@ -532,6 +558,7 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public void put(long lockid, Text column, byte val[]) throws IOException { public void put(long lockid, Text column, byte val[]) throws IOException {
checkClosed();
if (val == null) { if (val == null) {
throw new IllegalArgumentException("value cannot be null"); throw new IllegalArgumentException("value cannot be null");
} }
@ -569,6 +596,7 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public void delete(long lockid, Text column) throws IOException { public void delete(long lockid, Text column) throws IOException {
checkClosed();
if (batch != null) { if (batch != null) {
batch.delete(lockid, column); batch.delete(lockid, column);
return; return;
@ -602,6 +630,7 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public synchronized void abort(long lockid) throws IOException { public synchronized void abort(long lockid) throws IOException {
checkClosed();
if (batch != null) { if (batch != null) {
abortBatch(lockid); abortBatch(lockid);
return; return;
@ -645,6 +674,7 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public synchronized void commit(long lockid, long timestamp) throws IOException { public synchronized void commit(long lockid, long timestamp) throws IOException {
checkClosed();
if (batch != null) { if (batch != null) {
commitBatch(lockid, timestamp); commitBatch(lockid, timestamp);
return; return;
@ -679,6 +709,7 @@ public class HTable implements HConstants {
* @throws IOException * @throws IOException
*/ */
public synchronized void renewLease(long lockid) throws IOException { public synchronized void renewLease(long lockid) throws IOException {
checkClosed();
if (batch != null) { if (batch != null) {
return; return;
} }
@ -723,6 +754,7 @@ public class HTable implements HConstants {
private RowFilterInterface filter; private RowFilterInterface filter;
private void loadRegions() { private void loadRegions() {
checkClosed();
Text firstServer = null; Text firstServer = null;
if (this.startRow == null || this.startRow.getLength() == 0) { if (this.startRow == null || this.startRow.getLength() == 0) {
firstServer = tableServers.firstKey(); firstServer = tableServers.firstKey();
@ -763,6 +795,7 @@ public class HTable implements HConstants {
* Returns false if there are no more scanners. * Returns false if there are no more scanners.
*/ */
private boolean nextScanner() throws IOException { private boolean nextScanner() throws IOException {
checkClosed();
if (this.scannerId != -1L) { if (this.scannerId != -1L) {
this.server.close(this.scannerId); this.server.close(this.scannerId);
this.scannerId = -1L; this.scannerId = -1L;
@ -821,6 +854,7 @@ public class HTable implements HConstants {
* {@inheritDoc} * {@inheritDoc}
*/ */
public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException { public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
checkClosed();
if (this.closed) { if (this.closed) {
return false; return false;
} }
@ -844,6 +878,7 @@ public class HTable implements HConstants {
* {@inheritDoc} * {@inheritDoc}
*/ */
public void close() throws IOException { public void close() throws IOException {
checkClosed();
if (this.scannerId != -1L) { if (this.scannerId != -1L) {
this.server.close(this.scannerId); this.server.close(this.scannerId);
this.scannerId = -1L; this.scannerId = -1L;

View File

@ -0,0 +1,119 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
/**
* Tests HTable
*/
public class TestHTable extends HBaseClusterTestCase implements HConstants {
private static final HColumnDescriptor column =
new HColumnDescriptor(COLUMN_FAMILY.toString());
private static final Text tableAname = new Text("tableA");
private static final Text tableBname = new Text("tableB");
private static final Text row = new Text("row");
/**
* the test
* @throws IOException
*/
public void testHTable() throws IOException {
HTableDescriptor tableAdesc = new HTableDescriptor(tableAname.toString());
tableAdesc.addFamily(column);
HTableDescriptor tableBdesc = new HTableDescriptor(tableBname.toString());
tableBdesc.addFamily(column);
// create a couple of tables
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(tableAdesc);
admin.createTable(tableBdesc);
// put some data into table A
byte[] value = "value".getBytes(UTF8_ENCODING);
HTable a = new HTable(conf, tableAname);
long lockid = a.startBatchUpdate(row);
a.put(lockid, COLUMN_FAMILY, value);
a.commit(lockid);
// open a new connection to A and a connection to b
HTable newA = new HTable(conf, tableAname);
HTable b = new HTable(conf, tableBname);
// copy data from A to B
HScannerInterface s =
newA.obtainScanner(COLUMN_FAMILY_ARRAY, EMPTY_START_ROW);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
while(s.next(key, results)) {
lockid = b.startBatchUpdate(key.getRow());
for(Map.Entry<Text, byte[]> e: results.entrySet()) {
b.put(lockid, e.getKey(), e.getValue());
}
b.commit(lockid);
}
} finally {
s.close();
}
// Close table A and note how A becomes inaccessable
a.close();
try {
a.get(row, COLUMN_FAMILY);
fail();
} catch (IllegalStateException e) {
// expected
} catch (Exception e) {
e.printStackTrace();
fail();
}
// Opening a new connection to A will cause the tables to be reloaded
try {
HTable anotherA = new HTable(conf, tableAname);
anotherA.get(row, COLUMN_FAMILY);
} catch (Exception e) {
e.printStackTrace();
fail();
}
// We can still access A through newA because it has the table information
// cached. And if it needs to recalibrate, that will cause the information
// to be reloaded.
}
}