HBASE-8946 Add a new function to Thrift 2 to open scanner, get results and close scanner (Hamed Madani)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1505290 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Lars George 2013-07-21 08:01:37 +00:00
parent c923728108
commit 4fa2e6c37c
4 changed files with 1342 additions and 37 deletions

View File

@ -46,8 +46,8 @@ import org.apache.hadoop.hbase.thrift2.generated.*;
import org.apache.thrift.TException;
/**
* This class is a glue object that connects Thrift RPC calls to the HBase client API primarily defined in the
* HTableInterface.
* This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
* defined in the HTableInterface.
*/
@InterfaceAudience.Private
public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@ -59,41 +59,36 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
// nextScannerId and scannerMap are used to manage scanner state
// TODO: Cleanup thread for Scanners, Scanner id wrap
private final AtomicInteger nextScannerId = new AtomicInteger(0);
private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<Integer, ResultScanner>();
private final Map<Integer, ResultScanner> scannerMap =
new ConcurrentHashMap<Integer, ResultScanner>();
public static THBaseService.Iface newInstance(
Configuration conf, ThriftMetrics metrics) {
public static THBaseService.Iface newInstance(Configuration conf, ThriftMetrics metrics) {
THBaseService.Iface handler = new ThriftHBaseServiceHandler(conf);
return (THBaseService.Iface) Proxy.newProxyInstance(
handler.getClass().getClassLoader(),
new Class[]{THBaseService.Iface.class},
new THBaseServiceMetricsProxy(handler, metrics));
return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
}
private static class THBaseServiceMetricsProxy implements InvocationHandler {
private final THBaseService.Iface handler;
private final ThriftMetrics metrics;
private THBaseServiceMetricsProxy(
THBaseService.Iface handler, ThriftMetrics metrics) {
private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
this.handler = handler;
this.metrics = metrics;
}
@Override
public Object invoke(Object proxy, Method m, Object[] args)
throws Throwable {
public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
Object result;
try {
long start = now();
result = m.invoke(handler, args);
int processTime = (int)(now() - start);
int processTime = (int) (now() - start);
metrics.incMethodTime(m.getName(), processTime);
} catch (InvocationTargetException e) {
throw e.getTargetException();
} catch (Exception e) {
throw new RuntimeException(
"unexpected invocation exception: " + e.getMessage());
throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
}
return result;
}
@ -127,7 +122,6 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
/**
* Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
*
* @param scanner to add
* @return Id for this Scanner
*/
@ -139,7 +133,6 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
/**
* Returns the Scanner associated with the specified Id.
*
* @param id of the Scanner to get
* @return a Scanner, or null if the Id is invalid
*/
@ -149,7 +142,6 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
/**
* Removes the scanner associated with the specified ID from the internal HashMap.
*
* @param id of the Scanner to remove
* @return the removed Scanner, or <code>null</code> if the Id is invalid
*/
@ -206,12 +198,13 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
}
@Override
public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family, ByteBuffer qualifier, ByteBuffer value, TPut put)
throws TIOError, TException {
public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
HTableInterface htable = getTable(table);
try {
return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value), putFromThrift(put));
byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
putFromThrift(put));
} catch (IOException e) {
throw getTIOError(e);
} finally {
@ -244,7 +237,8 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
}
@Override
public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError, TException {
public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
TException {
HTableInterface htable = getTable(table);
try {
htable.delete(deletesFromThrift(deletes));
@ -257,17 +251,18 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
}
@Override
public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family, ByteBuffer qualifier, ByteBuffer value,
TDelete deleteSingle) throws TIOError, TException {
public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
HTableInterface htable = getTable(table);
try {
if (value == null) {
return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
} else {
return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
byteBufferToByteArray(qualifier), byteBufferToByteArray(value), deleteFromThrift(deleteSingle));
byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
deleteFromThrift(deleteSingle));
}
} catch (IOException e) {
throw getTIOError(e);
@ -303,7 +298,8 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
}
@Override
public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError, TIllegalArgument, TException {
public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
TIllegalArgument, TException {
ResultScanner scanner = getScanner(scannerId);
if (scanner == null) {
TIllegalArgument ex = new TIllegalArgument();
@ -318,6 +314,26 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
}
}
@Override
public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
throws TIOError, TException {
HTableInterface htable = getTable(table);
List<TResult> results = null;
ResultScanner scanner = null;
try {
scanner = htable.getScanner(scanFromThrift(scan));
results = resultsFromHBase(scanner.next(numRows));
} catch (IOException e) {
throw getTIOError(e);
} finally {
if (scanner != null) {
scanner.close();
}
closeTable(htable);
}
return results;
}
@Override
public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
LOG.debug("scannerClose: id=" + scannerId);

View File

@ -416,10 +416,9 @@ service THBaseService {
)
/**
* Closes the scanner. Should be called if you need to close
* the Scanner before all results are read.
*
* Exhausted scanners are closed automatically.
* Closes the scanner. Should be called to free server side resources timely.
* Typically close once the scanner is not needed anymore, i.e. after looping
* over it to get all the required rows.
*/
void closeScanner(
/** the Id of the Scanner to close **/
@ -442,4 +441,23 @@ service THBaseService {
2: required TRowMutations rowMutations
) throws (1: TIOError io)
/**
* Get results for the provided TScan object.
* This helper function opens a scanner, get the results and close the scanner.
*
* @return between zero and numRows TResults
*/
list<TResult> getScannerResults(
/** the table to get the Scanner for */
1: required binary table,
/** the scan object to get a Scanner for */
2: required TScan scan,
/** number of rows to return */
3: i32 numRows = 1
) throws (
1: TIOError io
)
}

View File

@ -478,7 +478,7 @@ public class TestThriftHBaseServiceHandler {
TDelete delete = new TDelete(wrap(rowName));
assertFalse(handler.checkAndDelete(table, wrap(rowName), wrap(familyAname),
wrap(qualifierAname), wrap(valueAname), delete));
wrap(qualifierAname), wrap(valueAname), delete));
TGet get = new TGet(wrap(rowName));
TResult result = handler.get(table, get);
@ -667,6 +667,63 @@ public class TestThriftHBaseServiceHandler {
}
}
@Test
public void testGetScannerResults() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
ByteBuffer table = wrap(tableAname);
// insert data
TColumnValue columnValue =
new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname));
List<TColumnValue> columnValues = new ArrayList<TColumnValue>();
columnValues.add(columnValue);
for (int i = 0; i < 20; i++) {
TPut put =
new TPut(wrap(("testGetScannerResults" + pad(i, (byte) 2)).getBytes()), columnValues);
handler.put(table, put);
}
// create scan instance
TScan scan = new TScan();
List<TColumn> columns = new ArrayList<TColumn>();
TColumn column = new TColumn();
column.setFamily(familyAname);
column.setQualifier(qualifierAname);
columns.add(column);
scan.setColumns(columns);
scan.setStartRow("testGetScannerResults".getBytes());
// get 5 rows and check the returned results
scan.setStopRow("testGetScannerResults05".getBytes());
List<TResult> results = handler.getScannerResults(table, scan, 5);
assertEquals(5, results.size());
for (int i = 0; i < 5; i++) {
// check if the rows are returned and in order
assertArrayEquals(("testGetScannerResults" + pad(i, (byte) 2)).getBytes(), results.get(i)
.getRow());
}
// get 10 rows and check the returned results
scan.setStopRow("testGetScannerResults10".getBytes());
results = handler.getScannerResults(table, scan, 10);
assertEquals(10, results.size());
for (int i = 0; i < 10; i++) {
// check if the rows are returned and in order
assertArrayEquals(("testGetScannerResults" + pad(i, (byte) 2)).getBytes(), results.get(i)
.getRow());
}
// get 20 rows and check the returned results
scan.setStopRow("testGetScannerResults20".getBytes());
results = handler.getScannerResults(table, scan, 20);
assertEquals(20, results.size());
for (int i = 0; i < 20; i++) {
// check if the rows are returned and in order
assertArrayEquals(("testGetScannerResults" + pad(i, (byte) 2)).getBytes(), results.get(i)
.getRow());
}
}
@Test
public void testFilterRegistration() throws Exception {
Configuration conf = UTIL.getConfiguration();